Skip to content

Commit

Permalink
[ISSUE #3897]Admin, RocketMQAdmin, StandaloneAdmin and AbstractRmqAdm…
Browse files Browse the repository at this point in the history
…in can be more clearer. (#3898)

* Modify Admin's hierarchy.

* Fix CI error.
  • Loading branch information
pandaapo authored Jun 6, 2023
1 parent e748f24 commit d374551
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 143 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.api.admin;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;

public abstract class AbstractAdmin implements Admin {

private final AtomicBoolean started;

public AbstractAdmin(AtomicBoolean started) {
this.started = started;
}

@Override
public boolean isStarted() {
return started.get();
}

@Override
public boolean isClosed() {
return !started.get();
}

@Override
public void start() {
started.set(true);
}

@Override
public void shutdown() {
started.set(false);
}

@Override
public void init(Properties properties) throws Exception {

}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
return null;
}

@Override
public void publish(CloudEvent cloudEvent) throws Exception {
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,66 @@

package org.apache.eventmesh.storage.rocketmq.admin;

import org.apache.eventmesh.api.admin.Admin;
import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;
import org.apache.eventmesh.common.config.ConfigService;
import org.apache.eventmesh.storage.rocketmq.config.ClientConfiguration;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;

public class RocketMQAdmin extends AbstractRmqAdmin implements Admin {
public class RocketMQAdmin extends AbstractAdmin {

private final AtomicBoolean isStarted;
private final RPCHook rpcHook;

protected String nameServerAddr;

protected String clusterName;

private int numOfQueue = 4;
private int queuePermission = 6;

public RocketMQAdmin() {
isStarted = new AtomicBoolean(false);
}
super(new AtomicBoolean(false));

@Override
public boolean isStarted() {
return isStarted.get();
}

@Override
public boolean isClosed() {
return !isStarted.get();
}

@Override
public void start() {
isStarted.compareAndSet(false, true);
}

@Override
public void shutdown() {
isStarted.compareAndSet(true, false);
}

@Override
public void init(Properties properties) {
ConfigService configService = ConfigService.getInstance();
ClientConfiguration clientConfiguration = configService.buildConfigInstance(ClientConfiguration.class);

nameServerAddr = clientConfiguration.getNamesrvAddr();
clusterName = clientConfiguration.getClusterName();
String accessKey = clientConfiguration.getAccessKey();
String secretKey = clientConfiguration.getSecretKey();
rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}

@Override
public List<TopicProperties> getTopic() throws Exception {
DefaultMQAdminExt adminExt = createMQAdminExt();
try {
List<TopicProperties> result = new ArrayList<>();

Set<String> topicList = getAdminExt().fetchAllTopicList().getTopicList();
adminExt.start();
Set<String> topicList = adminExt.fetchAllTopicList().getTopicList();
for (String topic : topicList) {
long messageCount = 0;
TopicStatsTable topicStats = getAdminExt().examineTopicStats(topic);
TopicStatsTable topicStats = adminExt.examineTopicStats(topic);
HashMap<MessageQueue, TopicOffset> offsetTable = topicStats.getOffsetTable();
for (TopicOffset topicOffset : offsetTable.values()) {
messageCount += topicOffset.getMaxOffset() - topicOffset.getMinOffset();
Expand All @@ -94,7 +89,7 @@ public List<TopicProperties> getTopic() throws Exception {
result.sort(Comparator.comparing(t -> t.name));
return result;
} finally {
shutdownExt();
adminExt.shutdown();
}
}

Expand All @@ -103,18 +98,20 @@ public void createTopic(String topicName) throws Exception {
if (StringUtils.isBlank(topicName)) {
throw new Exception("Topic name can not be blank");
}
DefaultMQAdminExt adminExt = createMQAdminExt();
try {
Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(getAdminExt(), clusterName);
adminExt.start();
Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
for (String masterAddress : brokerAddress) {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName(topicName);
topicConfig.setReadQueueNums(numOfQueue);
topicConfig.setWriteQueueNums(numOfQueue);
topicConfig.setPerm(queuePermission);
getAdminExt().createAndUpdateTopicConfig(masterAddress, topicConfig);
adminExt.createAndUpdateTopicConfig(masterAddress, topicConfig);
}
} finally {
shutdownExt();
adminExt.shutdown();
}
}

Expand All @@ -123,22 +120,22 @@ public void deleteTopic(String topicName) throws Exception {
if (StringUtils.isBlank(topicName)) {
throw new Exception("Topic name can not be blank.");
}
DefaultMQAdminExt adminExt = createMQAdminExt();
try {
Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(getAdminExt(), clusterName);
getAdminExt().deleteTopicInBroker(brokerAddress, topicName);
adminExt.start();
Set<String> brokerAddress = CommandUtil.fetchMasterAddrByClusterName(adminExt, clusterName);
adminExt.deleteTopicInBroker(brokerAddress, topicName);
} finally {
shutdownExt();
adminExt.shutdown();
}
}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) {
return null;
private DefaultMQAdminExt createMQAdminExt() {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
String groupId = UUID.randomUUID().toString();
adminExt.setAdminExtGroup("admin_ext_group-" + groupId);
adminExt.setNamesrvAddr(nameServerAddr);
return adminExt;
}

@Override
public void publish(CloudEvent cloudEvent) {
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public void deleteTopic(String topicName) throws Exception {
}

@Override
public List<CloudEvent> getEvent(String topicName, int offset, int length) {
public List<CloudEvent> getEvent(String topicName, int offset, int length) throws Exception {
return admin.getEvent(topicName, offset, length);
}

@Override
public void publish(CloudEvent cloudEvent) {
public void publish(CloudEvent cloudEvent) throws Exception {
admin.publish(cloudEvent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.eventmesh.storage.standalone.admin;

import org.apache.eventmesh.api.admin.Admin;
import org.apache.eventmesh.api.admin.AbstractAdmin;
import org.apache.eventmesh.api.admin.TopicProperties;
import org.apache.eventmesh.storage.standalone.broker.MessageQueue;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
Expand All @@ -26,45 +26,18 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import io.cloudevents.CloudEvent;

public class StandaloneAdmin implements Admin {
public class StandaloneAdmin extends AbstractAdmin {

private final AtomicBoolean isStarted;
private final StandaloneBroker standaloneBroker;

public StandaloneAdmin() {
super(new AtomicBoolean(false));
this.standaloneBroker = StandaloneBroker.getInstance();
this.isStarted = new AtomicBoolean(false);
}

@Override
public boolean isStarted() {
return isStarted.get();
}

@Override
public boolean isClosed() {
return !isStarted.get();
}

@Override
public void start() {
isStarted.compareAndSet(false, true);
}

@Override
public void shutdown() {
isStarted.compareAndSet(true, false);
}

@Override
public void init(Properties properties) throws Exception {

}

@Override
Expand Down

0 comments on commit d374551

Please sign in to comment.