From a87c47c8474428a9ce79ad1e63f73befda8394a0 Mon Sep 17 00:00:00 2001 From: echooymxq Date: Fri, 27 Feb 2026 15:09:24 +0800 Subject: [PATCH] [ISSUE #10114] Support delete consumer offset by consumer group and topic. --- .../broker/offset/ConsumerOffsetManager.java | 10 ++ .../processor/AdminBrokerProcessor.java | 19 ++++ .../offset/ConsumerOffsetManagerTest.java | 23 ++++ .../processor/AdminBrokerProcessorTest.java | 12 ++ .../rocketmq/client/impl/MQClientAPIImpl.java | 23 ++++ .../remoting/protocol/RequestCode.java | 2 + .../DeleteConsumerOffsetRequestHeader.java | 59 ++++++++++ .../tools/admin/DefaultMQAdminExt.java | 7 ++ .../tools/admin/DefaultMQAdminExtImpl.java | 6 + .../rocketmq/tools/admin/MQAdminExt.java | 4 + .../tools/command/MQAdminStartup.java | 2 + .../offset/DeleteConsumerOffsetCommand.java | 103 ++++++++++++++++++ .../DeleteConsumerOffsetCommandTest.java | 69 ++++++++++++ 13 files changed, 339 insertions(+) create mode 100644 remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteConsumerOffsetRequestHeader.java create mode 100644 tools/src/main/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommand.java create mode 100644 tools/src/test/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommandTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java index e062ceca96a..e94d7bb083c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java @@ -434,6 +434,16 @@ public void removeOffset(final String group) { "offsetTable={}, resetOffsetTable={}, pullOffsetTable={}", group, clearOffset, clearReset, clearPull); } + public void removeOffset(final String group, final String topic) { + String key = topic + TOPIC_GROUP_SEPARATOR + group; + ConcurrentMap removedOffset = this.offsetTable.remove(key); + this.resetOffsetTable.remove(key); + this.pullOffsetTable.remove(key); + removeConsumerOffset(key); + LOG.info("Remove consumer offset, group={}, topic={}, key={}, removedOffset={}", + group, topic, key, removedOffset); + } + public void assignResetOffset(String topic, String group, int queueId, long offset) { if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || queueId < 0 || offset < 0) { LOG.warn("Illegal arguments when assigning reset offset. Topic={}, group={}, queueId={}, offset={}", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 73aaa69e74a..5dcb0af934a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -156,6 +156,7 @@ import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader; @@ -310,6 +311,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, return this.getAllSubscriptionGroup(ctx, request); case RequestCode.DELETE_SUBSCRIPTIONGROUP: return this.deleteSubscriptionGroup(ctx, request); + case RequestCode.DELETE_CONSUMER_OFFSET: + return this.deleteConsumerOffset(ctx, request); case RequestCode.GET_TOPIC_STATS_INFO: return this.getTopicStatsInfo(ctx, request); case RequestCode.GET_CONSUMER_CONNECTION_LIST: @@ -1714,6 +1717,22 @@ private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, return response; } + private RemotingCommand deleteConsumerOffset(ChannelHandlerContext ctx, + RemotingCommand request) throws RemotingCommandException { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + DeleteConsumerOffsetRequestHeader requestHeader = + request.decodeCommandCustomHeader(DeleteConsumerOffsetRequestHeader.class); + + LOGGER.info("deleteConsumerOffset, caller={}, group={}, topic={}", + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), requestHeader.getTopic()); + + this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic()); + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java index 3ddd369c7fb..c42082a3069 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java @@ -84,6 +84,29 @@ public void removeOffsetByGroupTest() { Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group, topic, 0)); } + @Test + public void removeOffsetByGroupAndTopic_NotExist() { + consumerOffsetManager.removeOffset("NonExistGroup", "NonExistTopic"); + assertThat(consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue(); + } + + @Test + public void removeOffsetByGroupAndTopic_Exist() { + String topic = "TestTopic"; + String group = "TestGroup"; + String key = topic + TOPIC_GROUP_SEPARATOR + group; + Mockito.when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig()); + consumerOffsetManager.commitOffset("Commit", group, topic, 0, 100); + consumerOffsetManager.assignResetOffset(topic, group, 0, 100); + consumerOffsetManager.commitPullOffset("Pull", group, topic, 0, 100); + assertThat(consumerOffsetManager.getOffsetTable().containsKey(key)).isTrue(); + + consumerOffsetManager.removeOffset(group, topic); + assertThat(consumerOffsetManager.getOffsetTable().containsKey(key)).isFalse(); + Assert.assertEquals(-1L, consumerOffsetManager.queryOffset(group, topic, 0)); + Assert.assertEquals(-1L, consumerOffsetManager.queryPullOffset(group, topic, 0)); + } + @Test public void testOffsetPersistInMemory() { ConcurrentMap> offsetTable = consumerOffsetManager.getOffsetTable(); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index 656c783e1f4..436131e2851 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -88,6 +88,7 @@ import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader; @@ -1722,6 +1723,17 @@ private RemotingCommand createUpdateBrokerConfigCommand() { return request; } + @Test + public void testDeleteConsumerOffset() throws RemotingCommandException { + DeleteConsumerOffsetRequestHeader requestHeader = new DeleteConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup("testGroup"); + requestHeader.setTopic("testTopic"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_CONSUMER_OFFSET, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + private boolean notToBeExecuted() { return MixAll.isMac(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 8294ffd422f..7fe4d48bbb0 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -167,6 +167,7 @@ import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.CreateUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteConsumerOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader; @@ -2257,6 +2258,28 @@ public void deleteSubscriptionGroup(final String addr, final String groupName, f throw new MQClientException(response.getCode(), response.getRemark()); } + public void deleteConsumerOffset(final String addr, final String group, final String topic, + final long timeoutMillis) + throws RemotingException, InterruptedException, MQClientException { + DeleteConsumerOffsetRequestHeader requestHeader = new DeleteConsumerOffsetRequestHeader(); + requestHeader.setConsumerGroup(group); + requestHeader.setTopic(topic); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_CONSUMER_OFFSET, requestHeader); + + RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), + request, timeoutMillis); + assert response != null; + switch (response.getCode()) { + case ResponseCode.SUCCESS: { + return; + } + default: + break; + } + + throw new MQClientException(response.getCode(), response.getRemark()); + } + public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader(); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java index b32dbbc87ea..149b9d8331f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java @@ -229,6 +229,8 @@ public class RequestCode { public static final int LITE_PULL_MESSAGE = 361; public static final int RECALL_MESSAGE = 370; + public static final int DELETE_CONSUMER_OFFSET = 380; + public static final int QUERY_ASSIGNMENT = 400; public static final int SET_MESSAGE_REQUEST_MODE = 401; public static final int GET_ALL_MESSAGE_REQUEST_MODE = 402; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteConsumerOffsetRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteConsumerOffsetRequestHeader.java new file mode 100644 index 00000000000..0403202e7bd --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/DeleteConsumerOffsetRequestHeader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.protocol.header; + +import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.action.RocketMQAction; +import org.apache.rocketmq.common.resource.ResourceType; +import org.apache.rocketmq.common.resource.RocketMQResource; +import org.apache.rocketmq.remoting.annotation.CFNotNull; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.protocol.RequestCode; +import org.apache.rocketmq.remoting.rpc.RpcRequestHeader; + +@RocketMQAction(value = RequestCode.DELETE_CONSUMER_OFFSET, action = Action.DELETE) +public class DeleteConsumerOffsetRequestHeader extends RpcRequestHeader { + @CFNotNull + @RocketMQResource(ResourceType.GROUP) + private String consumerGroup; + + @CFNotNull + @RocketMQResource(ResourceType.TOPIC) + private String topic; + + @Override + public void checkFields() throws RemotingCommandException { + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } +} + diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index d29ffad2540..74363cf6a53 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -430,6 +430,13 @@ public void deleteSubscriptionGroup(String addr, defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, removeOffset); } + @Override + public void deleteConsumerOffset(String addr, String group, + String topic) throws RemotingException, MQBrokerException, InterruptedException, + MQClientException { + defaultMQAdminExtImpl.deleteConsumerOffset(addr, group, topic); + } + @Override public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index d96b4b03bcc..f8748e9feee 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -776,6 +776,12 @@ public void deleteSubscriptionGroup(String addr, String groupName, this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, removeOffset, timeoutMillis); } + @Override + public void deleteConsumerOffset(String addr, String group, + String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + this.mqClientInstance.getMQClientAPIImpl().deleteConsumerOffset(addr, group, topic, timeoutMillis); + } + @Override public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 980ff5acdb4..81743cd8c64 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -212,6 +212,10 @@ void deleteSubscriptionGroup(final String addr, String groupName, boolean removeOffset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; + void deleteConsumerOffset(final String addr, String group, + String topic) throws RemotingException, MQBrokerException, + InterruptedException, MQClientException; + void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java index d7054933e10..9cf1d532f85 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java @@ -105,6 +105,7 @@ import org.apache.rocketmq.tools.command.namesrv.UpdateNamesrvConfigCommand; import org.apache.rocketmq.tools.command.namesrv.WipeWritePermSubCommand; import org.apache.rocketmq.tools.command.offset.CloneGroupOffsetCommand; +import org.apache.rocketmq.tools.command.offset.DeleteConsumerOffsetCommand; import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand; import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand; import org.apache.rocketmq.tools.command.producer.ProducerSubCommand; @@ -240,6 +241,7 @@ public static void initCommand() { initCommand(new AddWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new SkipAccumulationSubCommand()); + initCommand(new DeleteConsumerOffsetCommand()); initCommand(new UpdateOrderConfCommand()); initCommand(new CleanExpiredCQSubCommand()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommand.java new file mode 100644 index 00000000000..a0e0919461d --- /dev/null +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommand.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import java.util.Set; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; +import org.apache.rocketmq.tools.command.CommandUtil; +import org.apache.rocketmq.tools.command.SubCommand; +import org.apache.rocketmq.tools.command.SubCommandException; + +public class DeleteConsumerOffsetCommand implements SubCommand { + + @Override + public String commandName() { + return "deleteConsumerOffset"; + } + + @Override + public String commandDesc() { + return "Delete consumer offset for specified consumer group and topic."; + } + + @Override + public Options buildCommandlineOptions(Options options) { + Option opt = new Option("g", "consumerGroup", true, "consumer group name (required)"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("t", "topic", true, "topic name (required)"); + opt.setRequired(true); + options.addOption(opt); + + opt = new Option("b", "brokerAddr", true, "delete consumer offset from which broker"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "clusterName", true, "delete consumer offset from which cluster"); + opt.setRequired(false); + options.addOption(opt); + + return options; + } + + @Override + public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { + DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); + defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); + + if (commandLine.hasOption('n')) { + defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim()); + } + try { + String group = commandLine.getOptionValue('g').trim(); + String topic = commandLine.getOptionValue('t').trim(); + defaultMQAdminExt.start(); + + if (commandLine.hasOption('b')) { + String addr = commandLine.getOptionValue('b').trim(); + + defaultMQAdminExt.deleteConsumerOffset(addr, group, topic); + System.out.printf("delete consumer offset for group [%s] topic [%s] from broker [%s] success.%n", + group, topic, addr); + return; + } else if (commandLine.hasOption('c')) { + String clusterName = commandLine.getOptionValue('c').trim(); + + Set masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); + for (String master : masterSet) { + defaultMQAdminExt.deleteConsumerOffset(master, group, topic); + System.out.printf( + "delete consumer offset for group [%s] topic [%s] from broker [%s] in cluster [%s] success.%n", + group, topic, master, clusterName); + } + return; + } + ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); + } catch (Exception e) { + throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); + } finally { + defaultMQAdminExt.shutdown(); + } + } +} + diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommandTest.java new file mode 100644 index 00000000000..5df6b6aab6a --- /dev/null +++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/DeleteConsumerOffsetCommandTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.tools.command.offset; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.rocketmq.srvutil.ServerUtil; +import org.apache.rocketmq.tools.command.SubCommandException; +import org.apache.rocketmq.tools.command.server.NameServerMocker; +import org.apache.rocketmq.tools.command.server.ServerResponseMocker; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class DeleteConsumerOffsetCommandTest { + + private ServerResponseMocker brokerMocker; + + private ServerResponseMocker nameServerMocker; + + @Before + public void before() { + brokerMocker = startOneBroker(); + nameServerMocker = NameServerMocker.startByDefaultConf(brokerMocker.listenPort()); + } + + @After + public void after() { + brokerMocker.shutdown(); + nameServerMocker.shutdown(); + } + + @Test + public void testExecuteByBrokerAddr() throws SubCommandException { + DeleteConsumerOffsetCommand cmd = new DeleteConsumerOffsetCommand(); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + String[] subargs = new String[] { + "-g", "testGroup", + "-t", "testTopic", + "-b", "127.0.0.1:" + brokerMocker.listenPort(), + "-n", "localhost:" + nameServerMocker.listenPort() + }; + final CommandLine commandLine = + ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, + cmd.buildCommandlineOptions(options), new DefaultParser()); + cmd.execute(commandLine, options, null); + } + + private ServerResponseMocker startOneBroker() { + // start broker with empty success response + return ServerResponseMocker.startServer(new byte[0]); + } +} +