Skip to content

Commit

Permalink
[ISSUE #418]Refactor the plugin load code
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Jul 7, 2021
1 parent c18adf7 commit 5cae93c
Show file tree
Hide file tree
Showing 20 changed files with 124 additions and 162 deletions.
16 changes: 8 additions & 8 deletions docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ sh start.sh
- eventmesh-runtime : eventmesh运行时模块
- eventmesh-sdk-java : eventmesh java客户端sdk
- eventmesh-starter : eventmesh本地启动运行项目入口
- eventmesh-spi : eventmesh SPI加载模块

> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 下配置相关接口与实现类的映射文件
> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件
**2.3.2 配置VM启动参数**

Expand All @@ -75,18 +76,17 @@ sh start.sh
```
> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\
**2.3.3 配置build.gradle文件**
**2.3.3 配置插件**

通过修改dependencies,compile project 项来指定项目启动后加载的插件
`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件

修改`eventmesh-starter`模块下面的`build.gradle`文件
修改`confPath`目录下面的`eventMesh.properties`文件

加载**RocketMQ**插件配置:
加载**RocketMQ Connector**插件配置:

```java
dependencies {
compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
}
#connector plugin
eventMesh.connector.plugin.type=rocketmq
```

**2.3.4 启动运行**
Expand Down
16 changes: 8 additions & 8 deletions docs/en/instructions/eventmesh-runtime-quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ Same with 1.2
- eventmesh-runtime : eventmesh runtime module
- eventmesh-sdk-java : eventmesh java client sdk
- eventmesh-starter : eventmesh project local start entry
- eventmesh-spi : eventmesh SPI load module

> ps: The loading of connector plugin follows the Java SPI mechanism, it's necessary to configure the mapping file of
related interface and implementation class under /main/resources/meta-inf/services in the corresponding module
> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of
related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module

**2.3.2 Configure VM Options**

Expand All @@ -76,18 +77,17 @@ related interface and implementation class under /main/resources/meta-inf/servic
```
> ps: If you use Windows, you may need to replace the file separator to \
**2.3.3 Configure build.gradle file**
**2.3.3 Configure plugin**

Specify the connector that will be loaded after the project start with updating compile project item in dependencies
Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties`

update `build.gradle` file under the `eventmesh-starter` module
Modify the `eventMesh.properties` file in the `confPath` directory

load **rocketmq connector** configuration:

```java
dependencies {
compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq")
}
#connector plugin
eventMesh.connector.plugin.type=rocketmq
```

**2.3.4 Run**
Expand Down
Binary file modified docs/images/project-structure.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,18 @@

package org.apache.eventmesh.common.config;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;

import com.google.common.base.Preconditions;

import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.IPUtil;

public class CommonConfiguration {
public String eventMeshEnv = "P";
public String eventMeshIDC = "FT";
public String eventMeshCluster = "LS";
public String eventMeshName = "";
public String sysID = "5477";

public String eventMeshConnectorPluginType = "rocketmq";

public String namesrvAddr = "";
public String clientUserName = "username";
Expand Down Expand Up @@ -84,8 +77,11 @@ public void init() {

eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP);
if (StringUtils.isBlank(eventMeshServerIp)) {
eventMeshServerIp = getLocalAddr();
eventMeshServerIp = IPUtil.getLocalAddress();
}

eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE);
Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE));
}
}

Expand All @@ -105,94 +101,7 @@ static class ConfKeys {
public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills";

public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills";
}

public static String getLocalAddr() {
//priority of networkInterface when generating client ip
String priority = System.getProperty("networkInterface.priority", "bond1<eth1<eth0");
ArrayList<String> preferList = new ArrayList<String>();
for (String eth : priority.split("<")) {
preferList.add(eth);
}
NetworkInterface preferNetworkInterface = null;

try {
Enumeration<NetworkInterface> enumeration1 = NetworkInterface.getNetworkInterfaces();
while (enumeration1.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration1.nextElement();
if (!preferList.contains(networkInterface.getName())) {
continue;
} else if (preferNetworkInterface == null) {
preferNetworkInterface = networkInterface;
}
//get the networkInterface that has higher priority
else if (preferList.indexOf(networkInterface.getName())
> preferList.indexOf(preferNetworkInterface.getName())) {
preferNetworkInterface = networkInterface;
}
}

// Traversal Network interface to get the first non-loopback and non-private address
ArrayList<String> ipv4Result = new ArrayList<String>();
ArrayList<String> ipv6Result = new ArrayList<String>();

if (preferNetworkInterface != null) {
final Enumeration<InetAddress> en = preferNetworkInterface.getInetAddresses();
getIpResult(ipv4Result, ipv6Result, en);
} else {
Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
while (enumeration.hasMoreElements()) {
final NetworkInterface networkInterface = enumeration.nextElement();
final Enumeration<InetAddress> en = networkInterface.getInetAddresses();
getIpResult(ipv4Result, ipv6Result, en);
}
}

// prefer ipv4
if (!ipv4Result.isEmpty()) {
for (String ip : ipv4Result) {
if (ip.startsWith("127.0") || ip.startsWith("192.168")) {
continue;
}

return ip;
}

return ipv4Result.get(ipv4Result.size() - 1);
} else if (!ipv6Result.isEmpty()) {
return ipv6Result.get(0);
}
//If failed to find,fall back to localhost
final InetAddress localHost = InetAddress.getLocalHost();
return normalizeHostAddress(localHost);
} catch (SocketException e) {
e.printStackTrace();
} catch (UnknownHostException e) {
e.printStackTrace();
}

return null;
}

public static String normalizeHostAddress(final InetAddress localHost) {
if (localHost instanceof Inet6Address) {
return "[" + localHost.getHostAddress() + "]";
} else {
return localHost.getHostAddress();
}
}

private static void getIpResult(ArrayList<String> ipv4Result, ArrayList<String> ipv6Result,
Enumeration<InetAddress> en) {
while (en.hasMoreElements()) {
final InetAddress address = en.nextElement();
if (!address.isLoopbackAddress()) {
if (address instanceof Inet6Address) {
ipv6Result.add(normalizeHostAddress(address));
} else {
ipv4Result.add(normalizeHostAddress(address));
}
}
}
public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type";
}
}
4 changes: 2 additions & 2 deletions eventmesh-connector-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ List open_message = [
]

dependencies {
implementation open_message,project(":eventmesh-common")
testImplementation open_message,project(":eventmesh-common")
implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.openmessaging.api.Message;

import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI
public interface MeshMQPushConsumer extends Consumer {

void init(Properties keyValue) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import io.openmessaging.api.SendCallback;

import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.spi.EventMeshSPI;

@EventMeshSPI
public interface MeshMQProducer extends Producer {

void init(Properties properties) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl
4 changes: 2 additions & 2 deletions eventmesh-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ List open_message = [


dependencies {
implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common")
testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api")
implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi")
testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi")
}
5 changes: 4 additions & 1 deletion eventmesh-runtime/conf/eventmesh.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5

#connector plugin
eventMesh.connector.plugin.type=rocketmq
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import java.util.List;
import java.util.Properties;
import java.util.ServiceLoader;

import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Message;
Expand All @@ -35,6 +34,14 @@ public class MQConsumerWrapper extends MQWrapper {

protected MeshMQPushConsumer meshMQPushConsumer;

public MQConsumerWrapper(String connectorPluginType) {
this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType);
if (meshMQPushConsumer == null) {
logger.error("can't load the meshMQPushConsumer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
}
}

public void subscribe(String topic, AsyncMessageListener listener) throws Exception {
meshMQPushConsumer.subscribe(topic, listener);
}
Expand All @@ -44,24 +51,10 @@ public void unsubscribe(String topic) throws Exception {
}

public synchronized void init(Properties keyValue) throws Exception {
meshMQPushConsumer = getMeshMQPushConsumer();
if (meshMQPushConsumer == null) {
logger.error("can't load the meshMQPushConsumer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check.");
}

meshMQPushConsumer.init(keyValue);
inited.compareAndSet(false, true);
}

private MeshMQPushConsumer getMeshMQPushConsumer() {
ServiceLoader<MeshMQPushConsumer> meshMQPushConsumerServiceLoader = ServiceLoader.load(MeshMQPushConsumer.class);
if (meshMQPushConsumerServiceLoader.iterator().hasNext()) {
return meshMQPushConsumerServiceLoader.iterator().next();
}
return null;
}

public synchronized void start() throws Exception {
meshMQPushConsumer.start();
started.compareAndSet(false, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,21 @@ public class MQProducerWrapper extends MQWrapper {

protected MeshMQProducer meshMQProducer;

public synchronized void init(Properties keyValue) throws Exception {
if (inited.get()) {
return;
}

meshMQProducer = getSpiMeshMQProducer();
public MQProducerWrapper(String connectorPluginType) {
this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType);
if (meshMQProducer == null) {
logger.error("can't load the meshMQProducer plugin, please check.");
throw new RuntimeException("doesn't load the meshMQProducer plugin, please check.");
}
meshMQProducer.init(keyValue);

inited.compareAndSet(false, true);
}

private MeshMQProducer getSpiMeshMQProducer() {
ServiceLoader<MeshMQProducer> meshMQProducerServiceLoader = ServiceLoader.load(MeshMQProducer.class);
if (meshMQProducerServiceLoader.iterator().hasNext()) {
return meshMQProducerServiceLoader.iterator().next();
public synchronized void init(Properties keyValue) throws Exception {
if (inited.get()) {
return;
}
return null;
meshMQProducer.init(keyValue);

inited.compareAndSet(false, true);
}

public synchronized void start() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to Apache Software Foundation (ASF) under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Apache Software Foundation (ASF) licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.eventmesh.runtime.core.plugin;

import org.apache.eventmesh.api.consumer.MeshMQPushConsumer;
import org.apache.eventmesh.api.producer.MeshMQProducer;
import org.apache.eventmesh.spi.EventMeshExtensionFactory;

public class PluginFactory {

public static MeshMQProducer getMeshMQProducer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName);
}

public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) {
return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName);
}

private static <T> T getPlugin(Class<T> pluginType, String pluginName) {
return EventMeshExtensionFactory.getExtension(pluginType, pluginName);
}
}
Loading

0 comments on commit 5cae93c

Please sign in to comment.