diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index 2e3134016c7..4317e53df9e 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -94,8 +94,8 @@ private void syncTopicConfig() { Map.Entry entry = iterator.next(); if (!newTopicConfigTable.containsKey(entry.getKey())) { iterator.remove(); + topicConfigManager.deleteTopicConfig(entry.getKey()); } - topicConfigManager.deleteTopicConfig(entry.getKey()); } //update @@ -189,8 +189,8 @@ private void syncSubscriptionGroupConfig() { Map.Entry configEntry = iterator.next(); if (!newSubscriptionGroupTable.containsKey(configEntry.getKey())) { iterator.remove(); + subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } - subscriptionGroupManager.deleteSubscriptionGroupConfig(configEntry.getKey()); } // update newSubscriptionGroupTable.values().forEach(subscriptionGroupManager::putSubscriptionGroupConfig); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java index c9461c42240..ccd3ec36afa 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/slave/SlaveSynchronizeTest.java @@ -36,6 +36,7 @@ import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.body.ConsumerOffsetSerializeWrapper; import org.apache.rocketmq.remoting.protocol.body.MessageRequestModeSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; import org.apache.rocketmq.store.MessageStore; @@ -205,4 +206,72 @@ private TimerMetrics.TimerMetricsSerializeWrapper createTimerMetricsWrapper() { wrapper.setDataVersion(dataVersion); return wrapper; } + + @Test + public void testSyncTopicConfig_withDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { + // Setup: Slave has existing topics that should be deleted + TopicConfig existingTopic1 = new TopicConfig("ToBeDeleted1"); + TopicConfig existingTopic2 = new TopicConfig("ToBeDeleted2"); + TopicConfig existingTopic3 = new TopicConfig("ToBeKept"); + + when(topicConfigManager.getTopicConfigTable()).thenReturn(new ConcurrentHashMap<>()); + topicConfigManager.getTopicConfigTable().put("ToBeDeleted1", existingTopic1); + topicConfigManager.getTopicConfigTable().put("ToBeDeleted2", existingTopic2); + topicConfigManager.getTopicConfigTable().put("ToBeKept", existingTopic3); + + // Master returns only some topics (not the ones to be deleted) + TopicConfig newTopic = new TopicConfig("NewTopic"); + TopicConfigAndMappingSerializeWrapper wrapper = createTopicConfigWrapper(newTopic); + wrapper.getTopicConfigTable().put("ToBeKept", existingTopic3); + + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(wrapper); + when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); + when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(createSubscriptionGroupWrapper()); + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); + when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); + when(topicConfigManager.getDataVersion()).thenReturn(new DataVersion()); + + slaveSynchronize.syncAll(); + + // Verify deleteTopicConfig was called for the removed topics + org.mockito.Mockito.verify(topicConfigManager).deleteTopicConfig("ToBeDeleted1"); + org.mockito.Mockito.verify(topicConfigManager).deleteTopicConfig("ToBeDeleted2"); + } + + @Test + public void testSyncSubscriptionGroupConfig_withDeletion() throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException, UnsupportedEncodingException, RemotingCommandException { + // Setup: Slave has existing subscription groups that should be deleted + SubscriptionGroupConfig existingGroup1 = new SubscriptionGroupConfig(); + existingGroup1.setGroupName("ToBeDeletedGroup1"); + + SubscriptionGroupConfig existingGroup2 = new SubscriptionGroupConfig(); + existingGroup2.setGroupName("ToBeDeletedGroup2"); + + SubscriptionGroupConfig existingGroup3 = new SubscriptionGroupConfig(); + existingGroup3.setGroupName("ToBeKeptGroup"); + + when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new ConcurrentHashMap<>()); + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeDeletedGroup1", existingGroup1); + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeDeletedGroup2", existingGroup2); + subscriptionGroupManager.getSubscriptionGroupTable().put("ToBeKeptGroup", existingGroup3); + + // Master returns only some subscription groups (not the ones to be deleted) + SubscriptionGroupWrapper wrapper = createSubscriptionGroupWrapper(); + wrapper.getSubscriptionGroupTable().put("ToBeKeptGroup", existingGroup3); + + when(brokerOuterAPI.getAllTopicConfig(anyString())).thenReturn(createTopicConfigWrapper(new TopicConfig("NewTopic"))); + when(brokerOuterAPI.getAllConsumerOffset(anyString())).thenReturn(createConsumerOffsetWrapper()); + when(brokerOuterAPI.getAllDelayOffset(anyString())).thenReturn(""); + when(brokerOuterAPI.getAllSubscriptionGroupConfig(anyString())).thenReturn(wrapper); + when(brokerOuterAPI.getAllMessageRequestMode(anyString())).thenReturn(createMessageRequestModeWrapper()); + when(brokerOuterAPI.getTimerMetrics(anyString())).thenReturn(createTimerMetricsWrapper()); + when(subscriptionGroupManager.getDataVersion()).thenReturn(new DataVersion()); + + slaveSynchronize.syncAll(); + + // Verify deleteSubscriptionGroupConfig was called for the removed groups + org.mockito.Mockito.verify(subscriptionGroupManager).deleteSubscriptionGroupConfig("ToBeDeletedGroup1"); + org.mockito.Mockito.verify(subscriptionGroupManager).deleteSubscriptionGroupConfig("ToBeDeletedGroup2"); + } }