Skip to content

Commit

Permalink
[ISSUE #418]Refactor the plugin load code
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jul 7, 2021
1 parent c18adf7 commit c63fff2
Show file tree
Hide file tree
Showing 19 changed files with 119 additions and 159 deletions.
13 changes: 6 additions & 7 deletions docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,17 @@ sh start.sh
```
> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
**2.3.3 配置build.gradle文件**
**2.3.3 配置插件**

通过修改dependencies,compile project 项来指定项目启动后加载的插件
`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件

修改`eventmesh-starter`模块下面的`build.gradle`文件
修改`confPath`目录下面的`eventMesh.properties`文件

加载**RocketMQ**插件配置:
加载**RocketMQ Connector**插件配置:

```java
dependencies {
compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
}
#connector plugin
eventMesh.connector.plugin.type=rocketmq
```

**2.3.4 启动运行**
Expand Down
11 changes: 5 additions & 6 deletions docs/en/instructions/eventmesh-runtime-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,17 @@ related interface and implementation class under /main/resources/meta-inf/servic
```
> ps: If you use Windows, you may need to replace the file separator to \
**2.3.3 Configure build.gradle file**
**2.3.3 Configure plugin**

Specify the connector that will be loaded after the project start with updating compile project item in dependencies
Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties`

update `build.gradle` file under the `eventmesh-starter` module
Modify the `eventMesh.properties` file in the `confPath` directory

load **rocketmq connector** configuration:

```java
dependencies {
compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
}
#connector plugin
eventMesh.connector.plugin.type=rocketmq
```

**2.3.4 Run**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,18 @@

package org.apache.eventmesh.common.config;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;

import com.google.common.base.Preconditions;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;

public class CommonConfiguration {
public String eventMeshEnv = "P";
public String eventMeshIDC = "FT";
public String eventMeshCluster = "LS";
public String eventMeshName = "";
public String sysID = "5477";

public String eventMeshConnectorPluginType = "rocketmq";

public String namesrvAddr = "";
public String clientUserName = "username";
Expand Down Expand Up @@ -84,8 +77,11 @@ public void init() {

eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
eventMeshServerIp = getLocalAddr();
eventMeshServerIp = IPUtil.getLocalAddress();
}

eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
}
}

Expand All @@ -105,94 +101,7 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills";

public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
}

public static String getLocalAddr() {
//priority of networkInterface when generating client ip
String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
ArrayList<String> preferList = new ArrayList<String>();
for (String eth : priority.split("<")) {
preferList.add(eth);
}
NetworkInterface preferNetworkInterface = null;

try {
Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
while (enumeration1.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration1.nextElement();
if (!preferList.contains(networkInterface.getName())) {
continue;
} else if (preferNetworkInterface == null) {
preferNetworkInterface = networkInterface;
}
//get the networkInterface that has higher priority
else if (preferList.indexOf(networkInterface.getName())
> preferList.indexOf(preferNetworkInterface.getName())) {
preferNetworkInterface = networkInterface;
}
}

// Traversal Network interface to get the first non-loopback and non-private address
ArrayList<String> ipv4Result = new ArrayList<String>();
ArrayList<String> ipv6Result = new ArrayList<String>();

if (preferNetworkInterface != null) {
final Enumeration<InetAddress> en = preferNetworkInterface.getInetAddresses();
getIpResult(ipv4Result, ipv6Result, en);
} else {
Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
while (enumeration.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration.nextElement();
final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
getIpResult(ipv4Result, ipv6Result, en);
}
}

// prefer ipv4
if (!ipv4Result.isEmpty()) {
for (String ip : ipv4Result) {
if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
continue;
}

return ip;
}

return ipv4Result.get(ipv4Result.size() - 1);
} else if (!ipv6Result.isEmpty()) {
return ipv6Result.get(0);
}
//If failed to find,fall back to localhost
final InetAddress localHost = InetAddress.getLocalHost();
return normalizeHostAddress(localHost);
} catch (SocketException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}

return null;
}

public static String normalizeHostAddress(final InetAddress localHost) {
if (localHost instanceof Inet6Address) {
return "[" + localHost.getHostAddress() + "]";
} else {
return localHost.getHostAddress();
}
}

private static void getIpResult(ArrayList<String> ipv4Result, ArrayList<String> ipv6Result,
Enumeration<InetAddress> en) {
while (en.hasMoreElements()) {
final InetAddress address = en.nextElement();
if (!address.isLoopbackAddress()) {
if (address instanceof Inet6Address) {
ipv6Result.add(normalizeHostAddress(address));
} else {
ipv4Result.add(normalizeHostAddress(address));
}
}
}
public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
}
}
4 changes: 2 additions & 2 deletions eventmesh-connector-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ List open_message = [
]

dependencies {
implementation open_message,project(":eventmesh-common")
testImplementation open_message,project(":eventmesh-common")
implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.openmessaging.api.Message;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI
public interface MeshMQPushConsumer extends Consumer {

void init(Properties keyValue) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import io.openmessaging.api.SendCallback;

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI
public interface MeshMQProducer extends Producer {

void init(Properties properties) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
4 changes: 2 additions & 2 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ List open_message = [


dependencies {
implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
}
5 changes: 4 additions & 1 deletion eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5

#connector plugin
eventMesh.connector.plugin.type=rocketmq
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.List;
import java.util.Properties;
import java.util.ServiceLoader;

import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
Expand All @@ -35,6 +34,14 @@ public class MQConsumerWrapper extends MQWrapper {

protected MeshMQPushConsumer meshMQPushConsumer;

public MQConsumerWrapper(String connectorPluginType) {
this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
if (meshMQPushConsumer == null) {
logger.error("can't load the meshMQPushConsumer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
}
}

public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
}
Expand All @@ -44,24 +51,10 @@ public void unsubscribe(String topic) throws Exception {
}

public synchronized void init(Properties keyValue) throws Exception {
meshMQPushConsumer = getMeshMQPushConsumer();
if (meshMQPushConsumer == null) {
logger.error("can't load the meshMQPushConsumer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
}

meshMQPushConsumer.init(keyValue);
inited.compareAndSet(false, true);
}

private MeshMQPushConsumer getMeshMQPushConsumer() {
ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class);
if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
return meshMQPushConsumerServiceLoader.iterator().next();
}
return null;
}

public synchronized void start() throws Exception {
meshMQPushConsumer.start();
started.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,21 @@ public class MQProducerWrapper extends MQWrapper {

protected MeshMQProducer meshMQProducer;

public synchronized void init(Properties keyValue) throws Exception {
if (inited.get()) {
return;
}

meshMQProducer = getSpiMeshMQProducer();
public MQProducerWrapper(String connectorPluginType) {
this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
if (meshMQProducer == null) {
logger.error("can't load the meshMQProducer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
}
meshMQProducer.init(keyValue);

inited.compareAndSet(false, true);
}

private MeshMQProducer getSpiMeshMQProducer() {
ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class);
if (meshMQProducerServiceLoader.iterator().hasNext()) {
return meshMQProducerServiceLoader.iterator().next();
public synchronized void init(Properties keyValue) throws Exception {
if (inited.get()) {
return;
}
return null;
meshMQProducer.init(keyValue);

inited.compareAndSet(false, true);
}

public synchronized void start() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.eventmesh.runtime.core.plugin;

import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

public class PluginFactory {

public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
}

public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
}

private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
return EventMeshExtensionFactory.getExtension(pluginType, pluginName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ public class EventMeshConsumer {

private ConsumerGroupConf consumerGroupConf;

private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper();
private MQConsumerWrapper persistentMqConsumer;

private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper();
private MQConsumerWrapper broadcastMqConsumer;

public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) {
this.eventMeshHTTPServer = eventMeshHTTPServer;
this.consumerGroupConf = consumerGroupConf;
this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType);
}

private MessageHandler httpMessageHandler;
Expand Down
Loading

0 comments on commit c63fff2

Please sign in to comment.