Skip to content

Commit

Permalink
[ISSUE #5022] Optimize DefaultMQAdminExtImpl (#5023)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangjidi <[email protected]>
  • Loading branch information
zhangjidi2016 and zhangjidi authored Sep 8, 2022
1 parent f1e95cd commit ad72dbe
Showing 1 changed file with 16 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.DEFAULT_PRODUCER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.TOOLS_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.SCHEDULE_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.FILTERSRV_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.MONITOR_CONSUMER_GROUP);
SYSTEM_GROUP_SET.add(MixAll.CLIENT_INNER_PRODUCER_GROUP);
Expand Down Expand Up @@ -349,7 +351,7 @@ public AdminToolResult doExecute() throws Exception {
final TopicStatsTable topicStatsTable = new TopicStatsTable();
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);

if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
if (topicRouteData == null || CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return AdminToolResult.success(topicStatsTable);
}
final CountDownLatch latch = new CountDownLatch(topicRouteData.getBrokerDatas().size());
Expand All @@ -364,7 +366,7 @@ public void run() {
topicStatsTable.getOffsetTable().putAll(tst.getOffsetTable());
}
} catch (Exception e) {
log.error("getTopicStatsInfo error. topic=" + topic, e);
log.error("getTopicStatsInfo error. topic={}", topic, e);
} finally {
latch.countDown();
}
Expand Down Expand Up @@ -508,7 +510,7 @@ public AdminToolResult doExecute() throws Exception {
continue;
}
}
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
if (topicRouteData == null || CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "topic router info not found");
}

Expand All @@ -527,7 +529,7 @@ public void run() {
consumerTpsMap.put(addr, consumeStats.getConsumeTps());
}
} catch (Exception e) {
log.error("getTopicStatsInfo error. topic=" + topic, e);
log.error("getConsumeStats error. topic={}, consumerGroup={}", topic, consumerGroup, e);
} finally {
latch.countDown();
}
Expand Down Expand Up @@ -719,7 +721,7 @@ public void run() {
mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
successList.add(addr);
} catch (Exception e) {
log.error("deleteTopicInBrokerConcurrent error. topic=" + topic + ", host=" + addr, e);
log.error("deleteTopicInBroker error. topic={}, broker={}", topic, addr, e);
failureList.add(addr);
} finally {
latch.countDown();
Expand Down Expand Up @@ -851,7 +853,7 @@ public AdminToolResult<BrokerOperatorResult> resetOffsetNewConcurrent(final Stri
@Override
public AdminToolResult doExecute() throws Exception {
TopicRouteData topicRouteData = examineTopicRouteInfo(topic);
if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
if (topicRouteData == null || CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "topic router info not found");
}
final Map<String, QueueData> topicRouteMap = new HashMap<String, QueueData>();
Expand Down Expand Up @@ -1037,10 +1039,7 @@ public SubscriptionData querySubscription(String group,
if (addr != null) {
return this.mqClientInstance.getMQClientAPIImpl().querySubscriptionByConsumer(addr, group, topic, timeoutMillis);
}

break;
}

return null;
}

Expand Down Expand Up @@ -1071,7 +1070,7 @@ public AdminToolResult doExecute() throws Exception {
String retryTopic = MixAll.getRetryTopic(group);
TopicRouteData topicRouteData = examineTopicRouteInfo(retryTopic);

if (topicRouteData == null || topicRouteData.getBrokerDatas() == null || topicRouteData.getBrokerDatas().size() == 0) {
if (topicRouteData == null || CollectionUtils.isEmpty(topicRouteData.getBrokerDatas())) {
return AdminToolResult.failure(AdminToolsResultCodeEnum.TOPIC_ROUTE_INFO_NOT_EXIST, "router info not found.");
}
final TopicList result = new TopicList();
Expand All @@ -1087,7 +1086,7 @@ public void run() {
result.getTopicList().addAll(topicList.getTopicList());
}
} catch (Exception e) {
log.error("getTopicStatsInfo error. groupId=" + group, e);
log.error("queryTopicsByConsumer error. group={}", group, e);
} finally {
latch.countDown();
}
Expand Down Expand Up @@ -1137,7 +1136,7 @@ public void run() {
spanSet.addAll(mqClientInstance.getMQClientAPIImpl().queryConsumeTimeSpan(addr, topic, group, timeoutMillis));
}
} catch (Exception e) {
log.error("queryConsumeTimeSpan error. topic=" + topic, e);
log.error("queryConsumeTimeSpan error. topic={}, group={}", topic, group, e);
} finally {
latch.countDown();
}
Expand Down Expand Up @@ -1185,7 +1184,7 @@ public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo,
public boolean cleanExpiredConsumerQueueByAddr(
String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis);
log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
log.warn("clean expired ConsumeQueue on target broker={}, execute result={}", addr, result);
return result;
}

Expand All @@ -1195,7 +1194,7 @@ public boolean deleteExpiredCommitLog(
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (null == cluster || "".equals(cluster)) {
if (StringUtils.isEmpty(cluster)) {
for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
result = deleteExpiredCommitLogByCluster(clusterInfo, targetCluster);
}
Expand Down Expand Up @@ -1224,7 +1223,7 @@ public boolean deleteExpiredCommitLogByCluster(ClusterInfo clusterInfo,
public boolean deleteExpiredCommitLogByAddr(
String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = mqClientInstance.getMQClientAPIImpl().deleteExpiredCommitLog(addr, timeoutMillis);
log.warn("Delete expired CommitLog on target " + addr + " broker " + result);
log.warn("Delete expired CommitLog on target broker={}, execute result={}", addr, result);
return result;
}

Expand All @@ -1234,7 +1233,7 @@ public boolean cleanUnusedTopic(
boolean result = false;
try {
ClusterInfo clusterInfo = examineBrokerClusterInfo();
if (null == cluster || "".equals(cluster)) {
if (StringUtils.isEmpty(cluster)) {
for (String targetCluster : clusterInfo.retrieveAllClusterNames()) {
result = cleanUnusedTopicByCluster(clusterInfo, targetCluster);
}
Expand Down Expand Up @@ -1262,7 +1261,7 @@ public boolean cleanUnusedTopicByCluster(ClusterInfo clusterInfo,
public boolean cleanUnusedTopicByAddr(
String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException {
boolean result = mqClientInstance.getMQClientAPIImpl().cleanUnusedTopicByAddr(addr, timeoutMillis);
log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result);
log.warn("clean unused topic on target broker={}, execute result={}", addr, result);
return result;
}

Expand Down

0 comments on commit ad72dbe

Please sign in to comment.