diff --git a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md index adc9703796..6ae41fa431 100644 --- a/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md +++ b/docs/cn/instructions/eventmesh-runtime-quickstart.zh-CN.md @@ -44,7 +44,7 @@ sh start.sh ### 2.1 依赖 -同上述步骤 1.1 +同上述步骤 1.1,但是只能在JDK 1.8下构建 ### 2.2 下载源码 @@ -62,32 +62,32 @@ sh start.sh - eventmesh-runtime : eventmesh运行时模块 - eventmesh-sdk-java : eventmesh java客户端sdk - eventmesh-starter : eventmesh本地启动运行项目入口 +- eventmesh-spi : eventmesh SPI加载模块 -> 注:插件模块遵循java spi机制,需要在对应模块中的/main/resources/META-INF/services 下配置相关接口与实现类的映射文件 +> 注:插件模块遵循eventmesh定义的spi机制,需要在对应模块中的/main/resources/META-INF/eventmesh 下配置相关接口与实现类的映射文件 -**2.3.2 配置VM启动参数** +**2.3.2 配置插件** -```java --Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml --Deventmesh.log.home=eventmesh-runtime/logs --Deventmesh.home=eventmesh-runtime --DconfPath=eventmesh-runtime/conf -``` -> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\ +在`eventMesh.properties`配置文件通过声明式的方式来指定项目启动后需要加载的插件 -**2.3.3 配置build.gradle文件** +修改`confPath`目录下面的`eventMesh.properties`文件 -通过修改dependencies,compile project 项来指定项目启动后加载的插件 +加载**RocketMQ Connector**插件配置: -修改`eventmesh-starter`模块下面的`build.gradle`文件 +```java +#connector plugin +eventMesh.connector.plugin.type=rocketmq +``` -加载**RocketMQ**插件配置: +**2.3.3 配置VM启动参数** ```java -dependencies { - compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq") -} +-Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml +-Deventmesh.log.home=eventmesh-runtime/logs +-Deventmesh.home=eventmesh-runtime +-DconfPath=eventmesh-runtime/conf ``` +> 注:如果操作系统为Windows, 可能需要将文件分隔符换成\ **2.3.4 启动运行** diff --git a/docs/en/instructions/eventmesh-runtime-quickstart.md b/docs/en/instructions/eventmesh-runtime-quickstart.md index 0536e5b6ee..62b315c57e 100644 --- a/docs/en/instructions/eventmesh-runtime-quickstart.md +++ b/docs/en/instructions/eventmesh-runtime-quickstart.md @@ -44,7 +44,7 @@ If you see "EventMeshTCPServer[port=10000] started....", you setup runtime succe ### 2.1 dependencies -Same with 1.1 +Same with 1.1, but it can be only compiled in JDK 1.8 ### 2.2 download sources @@ -62,33 +62,35 @@ Same with 1.2 - eventmesh-runtime : eventmesh runtime module - eventmesh-sdk-java : eventmesh java client sdk - eventmesh-starter : eventmesh project local start entry +- eventmesh-spi : eventmesh SPI load module -> ps: The loading of connector plugin follows the Java SPI mechanism, it's necessary to configure the mapping file of -related interface and implementation class under /main/resources/meta-inf/services in the corresponding module +> ps: The loading of connector plugin follows the eventmesh SPI mechanism, it's necessary to configure the mapping file of +related interface and implementation class under /main/resources/meta-inf/eventmesh in the corresponding module -**2.3.2 Configure VM Options** +**2.3.2 Configure plugin** -```java --Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml -Deventmesh.log.home=eventmesh-runtime/logs --Deventmesh.home=eventmesh-runtime --DconfPath=eventmesh-runtime/conf -``` -> ps: If you use Windows, you may need to replace the file separator to \ -**2.3.3 Configure build.gradle file** +Specify the connector plugin that will be loaded after the project start by declaring in `eventMesh.properties` -Specify the connector that will be loaded after the project start with updating compile project item in dependencies - -update `build.gradle` file under the `eventmesh-starter` module +Modify the `eventMesh.properties` file in the `confPath` directory load **rocketmq connector** configuration: ```java -dependencies { - compile project(":eventmesh-runtime"), project(":eventmesh-connector-rocketmq") -} +#connector plugin +eventMesh.connector.plugin.type=rocketmq +``` + +**2.3.3 Configure VM Options** + +```java +-Dlog4j.configurationFile=eventmesh-runtime/conf/log4j2.xml +-Deventmesh.log.home=eventmesh-runtime/logs +-Deventmesh.home=eventmesh-runtime +-DconfPath=eventmesh-runtime/conf ``` +> ps: If you use Windows, you may need to replace the file separator to \ **2.3.4 Run** diff --git a/docs/images/project-structure.png b/docs/images/project-structure.png index 252a9536c6..efc5249e59 100644 Binary files a/docs/images/project-structure.png and b/docs/images/project-structure.png differ diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java index aecfb0e3d0..895215d0da 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ThreadUtil.java @@ -37,7 +37,7 @@ public static void randomSleep(int max) throws Exception { /** * get current process id only once. * - * @return + * @return process id */ public static long getPID() { if (currentPID >= 0) { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java index 08a44cb8e1..1ae6b2604a 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/CommonConfiguration.java @@ -17,17 +17,10 @@ package org.apache.eventmesh.common.config; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Enumeration; - import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; +import org.apache.eventmesh.common.IPUtil; public class CommonConfiguration { public String eventMeshEnv = "P"; @@ -35,7 +28,7 @@ public class CommonConfiguration { public String eventMeshCluster = "LS"; public String eventMeshName = ""; public String sysID = "5477"; - + public String eventMeshConnectorPluginType = "rocketmq"; public String namesrvAddr = ""; public String clientUserName = "username"; @@ -84,8 +77,11 @@ public void init() { eventMeshServerIp = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_SERVER_HOST_IP); if (StringUtils.isBlank(eventMeshServerIp)) { - eventMeshServerIp = getLocalAddr(); + eventMeshServerIp = IPUtil.getLocalAddress(); } + + eventMeshConnectorPluginType = configurationWraper.getProp(ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE); + Preconditions.checkState(StringUtils.isNotEmpty(eventMeshConnectorPluginType), String.format("%s error", ConfKeys.KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE)); } } @@ -105,94 +101,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_SERVER_REGISTER_INTERVAL = "eventMesh.server.registry.registerIntervalInMills"; public static String KEYS_EVENTMESH_SERVER_FETCH_REGISTRY_ADDR_INTERVAL = "eventMesh.server.registry.fetchRegistryAddrIntervalInMills"; - } - - public static String getLocalAddr() { - //priority of networkInterface when generating client ip - String priority = System.getProperty("networkInterface.priority", "bond1 preferList = new ArrayList(); - for (String eth : priority.split("<")) { - preferList.add(eth); - } - NetworkInterface preferNetworkInterface = null; - - try { - Enumeration enumeration1 = NetworkInterface.getNetworkInterfaces(); - while (enumeration1.hasMoreElements()) { - final NetworkInterface networkInterface = enumeration1.nextElement(); - if (!preferList.contains(networkInterface.getName())) { - continue; - } else if (preferNetworkInterface == null) { - preferNetworkInterface = networkInterface; - } - //get the networkInterface that has higher priority - else if (preferList.indexOf(networkInterface.getName()) - > preferList.indexOf(preferNetworkInterface.getName())) { - preferNetworkInterface = networkInterface; - } - } - - // Traversal Network interface to get the first non-loopback and non-private address - ArrayList ipv4Result = new ArrayList(); - ArrayList ipv6Result = new ArrayList(); - - if (preferNetworkInterface != null) { - final Enumeration en = preferNetworkInterface.getInetAddresses(); - getIpResult(ipv4Result, ipv6Result, en); - } else { - Enumeration enumeration = NetworkInterface.getNetworkInterfaces(); - while (enumeration.hasMoreElements()) { - final NetworkInterface networkInterface = enumeration.nextElement(); - final Enumeration en = networkInterface.getInetAddresses(); - getIpResult(ipv4Result, ipv6Result, en); - } - } - - // prefer ipv4 - if (!ipv4Result.isEmpty()) { - for (String ip : ipv4Result) { - if (ip.startsWith("127.0") || ip.startsWith("192.168")) { - continue; - } - - return ip; - } - return ipv4Result.get(ipv4Result.size() - 1); - } else if (!ipv6Result.isEmpty()) { - return ipv6Result.get(0); - } - //If failed to find,fall back to localhost - final InetAddress localHost = InetAddress.getLocalHost(); - return normalizeHostAddress(localHost); - } catch (SocketException e) { - e.printStackTrace(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - - return null; - } - - public static String normalizeHostAddress(final InetAddress localHost) { - if (localHost instanceof Inet6Address) { - return "[" + localHost.getHostAddress() + "]"; - } else { - return localHost.getHostAddress(); - } - } - - private static void getIpResult(ArrayList ipv4Result, ArrayList ipv6Result, - Enumeration en) { - while (en.hasMoreElements()) { - final InetAddress address = en.nextElement(); - if (!address.isLoopbackAddress()) { - if (address instanceof Inet6Address) { - ipv6Result.add(normalizeHostAddress(address)); - } else { - ipv4Result.add(normalizeHostAddress(address)); - } - } - } + public static String KEYS_ENENTMESH_CONNECTOR_PLUGIN_TYPE = "eventMesh.connector.plugin.type"; } } \ No newline at end of file diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java index fc741a5a82..fb7a992237 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/RandomLoadBalanceSelector.java @@ -28,7 +28,7 @@ * This selector use random strategy. * Each selection will randomly give one from the given list * - * @param + * @param Target type */ public class RandomLoadBalanceSelector implements LoadBalanceSelector { diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java index d6f2009bf3..c0310003ad 100644 --- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java +++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/WeightRoundRobinLoadBalanceSelector.java @@ -27,7 +27,7 @@ * This selector use the weighted round robin strategy to select from list. * If the weight is greater, the probability of being selected is larger. * - * @param + * @param Target type */ public class WeightRoundRobinLoadBalanceSelector implements LoadBalanceSelector { diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/LiteMessageTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/LiteMessageTest.java new file mode 100644 index 0000000000..2f206a8d82 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/LiteMessageTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class LiteMessageTest { + + @Test + public void testGetProp() { + LiteMessage message = createLiteMessage(); + Assert.assertEquals(2L, message.getProp().size()); + } + + @Test + public void testSetProp() { + LiteMessage message = createLiteMessage(); + Map prop = new HashMap<>(); + prop.put("key3", "value3"); + message.setProp(prop); + Assert.assertEquals(1L, message.getProp().size()); + Assert.assertEquals("value3", message.getPropKey("key3")); + } + + @Test + public void testAddProp() { + LiteMessage message = createLiteMessage(); + message.addProp("key3", "value3"); + Assert.assertEquals(3L, message.getProp().size()); + Assert.assertEquals("value1", message.getPropKey("key1")); + } + + @Test + public void testGetPropKey() { + LiteMessage message = createLiteMessage(); + Assert.assertEquals("value1", message.getPropKey("key1")); + } + + @Test + public void testRemoveProp() { + LiteMessage message = createLiteMessage(); + message.removeProp("key1"); + Assert.assertEquals(1L, message.getProp().size()); + Assert.assertNull(message.getPropKey("key1")); + } + + private LiteMessage createLiteMessage() { + LiteMessage result = new LiteMessage(); + Map prop = new HashMap<>(); + prop.put("key1", "value1"); + prop.put("key2", "value2"); + result.setProp(prop); + return result; + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/command/HttpCommandTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/command/HttpCommandTest.java new file mode 100644 index 0000000000..ce5ac0c80b --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/command/HttpCommandTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.command; + +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import org.apache.eventmesh.common.protocol.http.body.BaseResponseBody; +import org.apache.eventmesh.common.protocol.http.body.Body; +import org.apache.eventmesh.common.protocol.http.header.Header; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class HttpCommandTest { + + @Mock + private Header header; + + @Mock + private Body body; + + private HttpCommand httpCommand; + + @Before + public void before() { + httpCommand = new HttpCommand("POST", "1.1", "200"); + } + + @Test + public void testCreateHttpCommandResponseWithHeaderAndBody() { + HttpCommand command = httpCommand.createHttpCommandResponse(header, body); + Map headerMap = new HashMap<>(); + headerMap.put("key1", "value1"); + when(header.toMap()).thenReturn(headerMap); + Assert.assertEquals("1.1", command.getHttpVersion()); + Assert.assertEquals("POST", command.getHttpMethod()); + Assert.assertEquals("200", command.getRequestCode()); + Assert.assertEquals("value1", command.getHeader().toMap().get("key1")); + } + + @Test + public void testCreateHttpCommandResponseWithRetCodeAndRetMsg() { + HttpCommand command = httpCommand.createHttpCommandResponse(200, "SUCCESS"); + Assert.assertThat(((BaseResponseBody) command.getBody()).getRetCode(), is(200)); + Assert.assertEquals("SUCCESS", ((BaseResponseBody) command.getBody()).getRetMsg()); + } + + @Test + public void testAbstractDesc() { + HttpCommand command = httpCommand.createHttpCommandResponse(header, body); + String desc = command.abstractDesc(); + Assert.assertTrue(desc.startsWith("httpCommand")); + } + + @Test + public void testSimpleDesc() { + HttpCommand command = httpCommand.createHttpCommandResponse(header, body); + String desc = command.simpleDesc(); + Assert.assertTrue(desc.startsWith("httpCommand")); + } + + @Test + public void testHttpResponse() throws Exception { + HttpCommand command = httpCommand.createHttpCommandResponse(header, body); + DefaultFullHttpResponse response = command.httpResponse(); + Assert.assertEquals("keep-alive", response.headers().get(HttpHeaderNames.CONNECTION)); + } + + @Test + public void testHttpResponseWithREQCmdType() throws Exception { + DefaultFullHttpResponse response = httpCommand.httpResponse(); + Assert.assertNull(response); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java new file mode 100644 index 0000000000..7880cddb00 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/CommonConfigurationTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CommonConfigurationTest { + + private CommonConfiguration configuration; + + @Before + public void before() { + String file = ConfigurationWraperTest.class.getResource("/configuration.properties").getFile(); + ConfigurationWraper wraper = new ConfigurationWraper(file, false); + configuration = new CommonConfiguration(wraper); + } + + @Test + public void testInit() { + configuration.init(); + Assert.assertEquals("value1", configuration.eventMeshEnv); + Assert.assertEquals("value2", configuration.eventMeshIDC); + Assert.assertEquals("3", configuration.sysID); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigurationWraperTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigurationWraperTest.java new file mode 100644 index 0000000000..7a89efa6ab --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/config/ConfigurationWraperTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.config; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ConfigurationWraperTest { + + private ConfigurationWraper wraper; + + @Before + public void before() { + String file = ConfigurationWraperTest.class.getResource("/configuration.properties").getFile(); + wraper = new ConfigurationWraper(file, false); + } + + @Test + public void testGetProp() { + Assert.assertEquals("value1", wraper.getProp("eventMesh.server.env")); + Assert.assertEquals("value2", wraper.getProp("eventMesh.server.idc")); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightTest.java new file mode 100644 index 0000000000..bdf7a845f2 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/WeightTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.loadbalance; + +import org.junit.Assert; +import org.junit.Test; + +public class WeightTest { + + @Test + public void testDecreaseTotal() { + Weight weight = new Weight(null, 0); + weight.decreaseTotal(1); + Assert.assertEquals(-1, weight.getCurrentWeight().get()); + } + + @Test + public void testIncreaseCurrentWeight() { + Weight weight = new Weight(null, 10); + weight.increaseCurrentWeight(); + Assert.assertEquals(10, weight.getCurrentWeight().get()); + } + + @Test + public void testGetCurrentWeight() { + Weight weight = new Weight(null, 0); + Assert.assertEquals(0, weight.getCurrentWeight().get()); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/body/BaseResponseBodyTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/body/BaseResponseBodyTest.java new file mode 100644 index 0000000000..edba782483 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/body/BaseResponseBodyTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.body; + +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.junit.Assert; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; + +public class BaseResponseBodyTest { + + @Test + public void testToMap() { + BaseResponseBody body = new BaseResponseBody(); + body.setRetCode(200); + body.setRetMsg("SUCCESS"); + Assert.assertTrue(body.toMap().containsKey(ProtocolKey.RETCODE)); + Assert.assertTrue(body.toMap().containsKey(ProtocolKey.RETMSG)); + Assert.assertTrue(body.toMap().containsKey(ProtocolKey.RESTIME)); + Assert.assertThat(body.toMap().get(ProtocolKey.RETCODE), is(200)); + Assert.assertThat(body.toMap().get(ProtocolKey.RETMSG), is("SUCCESS")); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseRequestHeaderTest.java new file mode 100644 index 0000000000..e511f192cb --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseRequestHeaderTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header; + + +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.is; + +public class BaseRequestHeaderTest { + + @Test + public void testToMap() { + Map headerParam = new HashMap<>(); + headerParam.put(ProtocolKey.REQUEST_CODE, "200"); + BaseRequestHeader header = BaseRequestHeader.buildHeader(headerParam); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE)); + Assert.assertThat(header.toMap().get(ProtocolKey.REQUEST_CODE), is("200")); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseResponseHeaderTest.java new file mode 100644 index 0000000000..f963eea55e --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/BaseResponseHeaderTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header; + +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.junit.Assert; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; + +public class BaseResponseHeaderTest { + + @Test + public void testToMap() { + BaseResponseHeader header = BaseResponseHeader.buildHeader("200"); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE)); + Assert.assertThat(header.toMap().get(ProtocolKey.REQUEST_CODE), is("200")); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractRequestHeaderTest.java new file mode 100644 index 0000000000..115aa12eea --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractRequestHeaderTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.http.header.Header; +import org.junit.Assert; + +public class AbstractRequestHeaderTest { + + public void assertMapContent(Header header) { + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.LANGUAGE)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.VERSION)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.ENV)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.IDC)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.SYS)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.PID)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.IP)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.USERNAME)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.ClientInstanceKey.PASSWD)); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractResponseHeaderTest.java new file mode 100644 index 0000000000..0b5b0bda92 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/AbstractResponseHeaderTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.apache.eventmesh.common.protocol.http.header.Header; +import org.junit.Assert; + +import static org.hamcrest.CoreMatchers.is; + +public class AbstractResponseHeaderTest { + + public void assertMapContent(Header header) { + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.REQUEST_CODE)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV)); + Assert.assertTrue(header.toMap().containsKey(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC)); + Assert.assertThat(header.toMap().get(ProtocolKey.REQUEST_CODE), is(200)); + Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER), is("CLUSTER")); + Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP), is("127.0.0.1")); + Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV), is("DEV")); + Assert.assertThat(header.toMap().get(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC), is("IDC")); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeaderTest.java new file mode 100644 index 0000000000..7b1afaeea0 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatRequestHeaderTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + + +import org.junit.Test; + +import java.util.HashMap; + +public class HeartbeatRequestHeaderTest extends AbstractRequestHeaderTest { + + @Test + public void testToMap() { + HeartbeatRequestHeader header = HeartbeatRequestHeader.buildHeader(new HashMap<>()); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeaderTest.java new file mode 100644 index 0000000000..d27aeaf8d4 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/HeartbeatResponseHeaderTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.apache.eventmesh.common.protocol.http.common.ProtocolKey; +import org.junit.Assert; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; + +public class HeartbeatResponseHeaderTest extends AbstractResponseHeaderTest { + + @Test + public void testToMap() { + HeartbeatResponseHeader header = HeartbeatResponseHeader.buildHeader(200, + "CLUSTER", "127.0.0.1", "DEV", "IDC"); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeaderTest.java new file mode 100644 index 0000000000..dbeb33bf26 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegRequestHeaderTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.junit.Test; + +import java.util.HashMap; + +public class RegRequestHeaderTest extends AbstractRequestHeaderTest { + + @Test + public void testToMap() { + RegRequestHeader header = RegRequestHeader.buildHeader(new HashMap<>()); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeaderTest.java new file mode 100644 index 0000000000..503c6425ed --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/RegResponseHeaderTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.junit.Test; + +public class RegResponseHeaderTest extends AbstractResponseHeaderTest { + + @Test + public void testToMap() { + RegResponseHeader header = RegResponseHeader.buildHeader(200, + "CLUSTER", "127.0.0.1", "DEV", "IDC"); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeaderTest.java new file mode 100644 index 0000000000..e883688903 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeRequestHeaderTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.junit.Test; + +import java.util.HashMap; + +public class SubscribeRequestHeaderTest extends AbstractRequestHeaderTest { + + @Test + public void testToMap() { + SubscribeRequestHeader header = SubscribeRequestHeader.buildHeader(new HashMap<>()); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeaderTest.java new file mode 100644 index 0000000000..9d646338dd --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/SubscribeResponseHeaderTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + + +import org.junit.Test; + +public class SubscribeResponseHeaderTest extends AbstractResponseHeaderTest { + + @Test + public void testToMap() { + SubscribeResponseHeader header = SubscribeResponseHeader.buildHeader(200, + "CLUSTER", "127.0.0.1", "DEV", "IDC"); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeaderTest.java new file mode 100644 index 0000000000..9e77a21711 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegRequestHeaderTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + + +import org.junit.Test; + +import java.util.HashMap; + +public class UnRegRequestHeaderTest extends AbstractRequestHeaderTest { + + @Test + public void testToMap() { + UnRegRequestHeader header = UnRegRequestHeader.buildHeader(new HashMap<>()); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeaderTest.java new file mode 100644 index 0000000000..0c7b016417 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnRegResponseHeaderTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.junit.Test; + +public class UnRegResponseHeaderTest extends AbstractResponseHeaderTest { + + @Test + public void testToMap() { + UnRegResponseHeader header = UnRegResponseHeader.buildHeader(200, + "CLUSTER", "127.0.0.1", "DEV", "IDC"); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeaderTest.java new file mode 100644 index 0000000000..5ee7c9e509 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeRequestHeaderTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + +import org.junit.Test; + +import java.util.HashMap; + +public class UnSubscribeRequestHeaderTest extends AbstractRequestHeaderTest { + + @Test + public void testToMap() { + UnSubscribeRequestHeader header = UnSubscribeRequestHeader.buildHeader(new HashMap<>()); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeaderTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeaderTest.java new file mode 100644 index 0000000000..3e8cb70ed4 --- /dev/null +++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/protocol/http/header/client/UnSubscribeResponseHeaderTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.common.protocol.http.header.client; + + +import org.junit.Test; + +public class UnSubscribeResponseHeaderTest extends AbstractResponseHeaderTest { + + @Test + public void testToMap() { + UnSubscribeResponseHeader header = UnSubscribeResponseHeader.buildHeader(200, + "CLUSTER", "127.0.0.1", "DEV", "IDC"); + assertMapContent(header); + } +} diff --git a/eventmesh-common/src/test/resources/configuration.properties b/eventmesh-common/src/test/resources/configuration.properties new file mode 100644 index 0000000000..76f29f2771 --- /dev/null +++ b/eventmesh-common/src/test/resources/configuration.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +eventMesh.server.env=value1 +eventMesh.server.idc=value2 +eventMesh.sysid=3 +eventMesh.server.cluster=value4 +eventMesh.server.name=value5 +eventMesh.server.hostIp=value6 +eventMesh.connector.plugin.type=rocketmq diff --git a/eventmesh-connector-api/build.gradle b/eventmesh-connector-api/build.gradle index 2d1205df38..157048ecc4 100644 --- a/eventmesh-connector-api/build.gradle +++ b/eventmesh-connector-api/build.gradle @@ -20,6 +20,6 @@ List open_message = [ ] dependencies { - implementation open_message,project(":eventmesh-common") - testImplementation open_message,project(":eventmesh-common") + implementation open_message,project(":eventmesh-common"), project(":eventmesh-spi") + testImplementation open_message,project(":eventmesh-common"), project(":eventmesh-spi") } diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java index 5e60e0e0df..4ac1edbfc8 100644 --- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java +++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/consumer/MeshMQPushConsumer.java @@ -25,7 +25,9 @@ import io.openmessaging.api.Message; import org.apache.eventmesh.api.AbstractContext; +import org.apache.eventmesh.spi.EventMeshSPI; +@EventMeshSPI public interface MeshMQPushConsumer extends Consumer { void init(Properties keyValue) throws Exception; diff --git a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java index 82ca583ce7..c717385e05 100644 --- a/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java +++ b/eventmesh-connector-api/src/main/java/org/apache/eventmesh/api/producer/MeshMQProducer.java @@ -24,7 +24,9 @@ import io.openmessaging.api.SendCallback; import org.apache.eventmesh.api.RRCallback; +import org.apache.eventmesh.spi.EventMeshSPI; +@EventMeshSPI public interface MeshMQProducer extends Producer { void init(Properties properties) throws Exception; diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java index 11a1858a6e..6fa41042b6 100644 --- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java +++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java @@ -33,7 +33,7 @@ public class ClientConfiguration { public Integer ackWindow = 1000; public Integer pubWindow = 100; public long consumeTimeout = 0L; - public Integer pollNameServerInteval = 10 * 1000; + public Integer pollNameServerInterval = 10 * 1000; public Integer heartbeatBrokerInterval = 30 * 1000; public Integer rebalanceInterval = 20 * 1000; @@ -104,18 +104,18 @@ public void init() { String clientPollNamesrvIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL)); - pollNameServerInteval = Integer.valueOf(clientPollNamesrvIntervalStr); + pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr); } - String clientHeartbeatBrokerIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVEL); + String clientHeartbeatBrokerIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVEL)); + Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL)); heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr); } - String clientRebalanceIntervalIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVEL); + String clientRebalanceIntervalIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL); if (StringUtils.isNotEmpty(clientRebalanceIntervalIntervalStr)) { - Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVEL)); + Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL)); rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr); } } @@ -144,9 +144,9 @@ static class ConfKeys { public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL = "eventMesh.server.rocketmq.client.pollNameServerInterval"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVEL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; - public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVEL = "eventMesh.server.rocketmq.client.rebalanceInterval"; + public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval"; } } \ No newline at end of file diff --git a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java index ba35acf186..906be5f239 100644 --- a/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java +++ b/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/utils/OMSUtil.java @@ -221,6 +221,9 @@ public static boolean isOMSHeader(String value) { /** * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. + * + * @param rmqResult RocketMQ result + * @return send result */ public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { SendResult sendResult = new SendResult(); @@ -241,6 +244,9 @@ public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.S /** * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. + * + * @param Target type + * @return Iterator */ public static Iterator cycle(final Iterable iterable) { return new Iterator() { diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer new file mode 100644 index 0000000000..0df2e286d7 --- /dev/null +++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.consumer.MeshMQPushConsumer @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rocketmq=org.apache.eventmesh.connector.rocketmq.consumer.RocketMQConsumerImpl \ No newline at end of file diff --git a/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer new file mode 100644 index 0000000000..ef4959d994 --- /dev/null +++ b/eventmesh-connector-rocketmq/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.producer.MeshMQProducer @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rocketmq=org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl \ No newline at end of file diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 09dd79f9c2..e5bb065659 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -31,6 +31,6 @@ List open_message = [ dependencies { - implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common") - testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api") + implementation metrics, open_message,project(":eventmesh-connector-api"),project(":eventmesh-common"),project(":eventmesh-spi") + testImplementation metrics,open_message,project(":eventmesh-common"),project(":eventmesh-connector-api"),project(":eventmesh-spi") } diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 035b950fc6..45fc193b4f 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -51,4 +51,7 @@ eventMesh.server.admin.http.port=10106 eventMesh.server.registry.registerIntervalInMills=10000 eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000 #auto-ack -#eventMesh.server.defibus.client.comsumeTimeoutInMin=5 \ No newline at end of file +#eventMesh.server.defibus.client.comsumeTimeoutInMin=5 + +#connector plugin +eventMesh.connector.plugin.type=rocketmq \ No newline at end of file diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java index a73c24a7b7..55e6896a5e 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/configuration/EventMeshHTTPConfiguration.java @@ -48,7 +48,7 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public int eventMeshServerRetryThreadNum = 2; - public int eventMeshServerPullRegistryIntervel = 30000; + public int eventMeshServerPullRegistryInterval = 30000; public int eventMeshServerAsyncAccumulationThreshold = 1000; @@ -62,7 +62,7 @@ public class EventMeshHTTPConfiguration extends CommonConfiguration { public int eventMeshServerClientManageBlockQSize = 1000; - public int eventMeshServerBusyCheckIntervel = 1000; + public int eventMeshServerBusyCheckInterval = 1000; public boolean eventMeshServerConsumerEnabled = false; @@ -126,9 +126,9 @@ public void init() { eventMeshServerClientManageThreadNum = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerClientManageThreadNumStr)); } - String eventMeshServerPullRegistryIntervelStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_PULL_REGISTRY_INTERVEL); - if (StringUtils.isNotEmpty(eventMeshServerPullRegistryIntervelStr) && StringUtils.isNumeric(eventMeshServerPullRegistryIntervelStr)) { - eventMeshServerPullRegistryIntervel = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerPullRegistryIntervelStr)); + String eventMeshServerPullRegistryIntervalStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_PULL_REGISTRY_INTERVAL); + if (StringUtils.isNotEmpty(eventMeshServerPullRegistryIntervalStr) && StringUtils.isNumeric(eventMeshServerPullRegistryIntervalStr)) { + eventMeshServerPullRegistryInterval = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerPullRegistryIntervalStr)); } String eventMeshServerAdminThreadNumStr = configurationWraper.getProp(ConfKeys.KEYS_EVENTMESH_ADMIN_THREAD_NUM); @@ -161,9 +161,9 @@ public void init() { eventMeshServerClientManageBlockQSize = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerClientManageBlockQSizeStr)); } - String eventMeshServerBusyCheckIntervelStr = configurationWraper.getProp(ConfKeys.KEY_EVENTMESH_BUSY_CHECK_INTERVEL); - if (StringUtils.isNotEmpty(eventMeshServerBusyCheckIntervelStr) && StringUtils.isNumeric(eventMeshServerBusyCheckIntervelStr)) { - eventMeshServerBusyCheckIntervel = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBusyCheckIntervelStr)); + String eventMeshServerBusyCheckIntervalStr = configurationWraper.getProp(ConfKeys.KEY_EVENTMESH_BUSY_CHECK_INTERVAL); + if (StringUtils.isNotEmpty(eventMeshServerBusyCheckIntervalStr) && StringUtils.isNumeric(eventMeshServerBusyCheckIntervalStr)) { + eventMeshServerBusyCheckInterval = Integer.valueOf(StringUtils.deleteWhitespace(eventMeshServerBusyCheckIntervalStr)); } String eventMeshServerConsumerEnabledStr = configurationWraper.getProp(ConfKeys.KEY_EVENTMESH_CONSUMER_ENABLED); @@ -195,7 +195,7 @@ static class ConfKeys { public static String KEYS_EVENTMESH_ASYNC_ACCUMULATION_THRESHOLD = "eventMesh.server.async.accumulation.threshold"; - public static String KEY_EVENTMESH_BUSY_CHECK_INTERVEL = "eventMesh.server.busy.check.intervel"; + public static String KEY_EVENTMESH_BUSY_CHECK_INTERVAL = "eventMesh.server.busy.check.interval"; public static String KEYS_EVENTMESH_SENDMSG_THREAD_NUM = "eventMesh.server.sendmsg.threads.num"; @@ -211,7 +211,7 @@ static class ConfKeys { public static String KEY_EVENTMESH_RETRY_THREAD_NUM = "eventMesh.server.retry.threads.num"; - public static String KEYS_EVENTMESH_PULL_REGISTRY_INTERVEL = "eventMesh.server.pull.registry.intervel"; + public static String KEYS_EVENTMESH_PULL_REGISTRY_INTERVAL = "eventMesh.server.pull.registry.interval"; public static String KEY_EVENTMESH_RETRY_BLOCKQ_SIZE = "eventMesh.server.retry.blockQ.size"; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java index 080b7af40a..b09f9eda11 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQConsumerWrapper.java @@ -35,6 +35,14 @@ public class MQConsumerWrapper extends MQWrapper { protected MeshMQPushConsumer meshMQPushConsumer; + public MQConsumerWrapper(String connectorPluginType) { + this.meshMQPushConsumer = PluginFactory.getMeshMQPushConsumer(connectorPluginType); + if (meshMQPushConsumer == null) { + logger.error("can't load the meshMQPushConsumer plugin, please check."); + throw new RuntimeException("doesn't load the meshMQPushConsumer plugin, please check."); + } + } + public void subscribe(String topic, AsyncMessageListener listener) throws Exception { meshMQPushConsumer.subscribe(topic, listener); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java index 082ab3bc00..03c38a73fa 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/MQProducerWrapper.java @@ -34,6 +34,14 @@ public class MQProducerWrapper extends MQWrapper { protected MeshMQProducer meshMQProducer; + public MQProducerWrapper(String connectorPluginType) { + this.meshMQProducer = PluginFactory.getMeshMQProducer(connectorPluginType); + if (meshMQProducer == null) { + logger.error("can't load the meshMQProducer plugin, please check."); + throw new RuntimeException("doesn't load the meshMQProducer plugin, please check."); + } + } + public synchronized void init(Properties keyValue) throws Exception { if (inited.get()) { return; @@ -44,8 +52,8 @@ public synchronized void init(Properties keyValue) throws Exception { logger.error("can't load the meshMQProducer plugin, please check."); throw new RuntimeException("doesn't load the meshMQProducer plugin, please check."); } - meshMQProducer.init(keyValue); + meshMQProducer.init(keyValue); inited.compareAndSet(false, true); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java new file mode 100644 index 0000000000..b11495341b --- /dev/null +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/plugin/PluginFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to Apache Software Foundation (ASF) under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Apache Software Foundation (ASF) licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.eventmesh.runtime.core.plugin; + +import org.apache.eventmesh.api.consumer.MeshMQPushConsumer; +import org.apache.eventmesh.api.producer.MeshMQProducer; +import org.apache.eventmesh.spi.EventMeshExtensionFactory; + +public class PluginFactory { + + public static MeshMQProducer getMeshMQProducer(String connectorPluginName) { + return EventMeshExtensionFactory.getExtension(MeshMQProducer.class, connectorPluginName); + } + + public static MeshMQPushConsumer getMeshMQPushConsumer(String connectorPluginName) { + return EventMeshExtensionFactory.getExtension(MeshMQPushConsumer.class, connectorPluginName); + } + + private static T getPlugin(Class pluginType, String pluginName) { + return EventMeshExtensionFactory.getExtension(pluginType, pluginName); + } +} diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java index 8620e682ab..ef051b03f4 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/consumer/EventMeshConsumer.java @@ -66,13 +66,15 @@ public class EventMeshConsumer { private ConsumerGroupConf consumerGroupConf; - private MQConsumerWrapper persistentMqConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper persistentMqConsumer; - private MQConsumerWrapper broadcastMqConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper broadcastMqConsumer; public EventMeshConsumer(EventMeshHTTPServer eventMeshHTTPServer, ConsumerGroupConf consumerGroupConf) { this.eventMeshHTTPServer = eventMeshHTTPServer; this.consumerGroupConf = consumerGroupConf; + this.persistentMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType); + this.broadcastMqConsumer = new MQConsumerWrapper(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshConnectorPluginType); } private MessageHandler httpMessageHandler; diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java index cf41ca2b9e..fe32180879 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/producer/EventMeshProducer.java @@ -69,7 +69,7 @@ public boolean reply(final SendMessageContext sendMsgContext, final SendCallback return true; } - protected MQProducerWrapper mqProducerWrapper = new MQProducerWrapper(); + protected MQProducerWrapper mqProducerWrapper; public MQProducerWrapper getMqProducerWrapper() { return mqProducerWrapper; @@ -85,7 +85,7 @@ public synchronized void init(EventMeshHTTPConfiguration eventMeshHttpConfigurat //TODO for defibus keyValue.put("eventMeshIDC", eventMeshHttpConfiguration.eventMeshIDC); - + mqProducerWrapper = new MQProducerWrapper(eventMeshHttpConfiguration.eventMeshConnectorPluginType); mqProducerWrapper.init(keyValue); inited.compareAndSet(false, true); logger.info("EventMeshProducer [{}] inited.............", producerGroupConfig.getGroupName()); diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java index 5829e4941a..310ea6dee2 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.java @@ -50,6 +50,7 @@ import org.apache.eventmesh.runtime.constants.EventMeshConstants; import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper; import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper; +import org.apache.eventmesh.runtime.core.plugin.PluginFactory; import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session; import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext; @@ -96,14 +97,16 @@ public class ClientGroupWrapper { public AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE); - private MQConsumerWrapper persistentMsgConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper persistentMsgConsumer; - private MQConsumerWrapper broadCastMsgConsumer = new MQConsumerWrapper(); + private MQConsumerWrapper broadCastMsgConsumer; private ConcurrentHashMap> topic2sessionInGroupMapping = new ConcurrentHashMap>(); public AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE); + private MQProducerWrapper mqProducerWrapper; + public ClientGroupWrapper(String sysId, String producerGroup, String consumerGroup, EventMeshTCPServer eventMeshTCPServer, DownstreamDispatchStrategy downstreamDispatchStrategy) { @@ -115,6 +118,9 @@ public ClientGroupWrapper(String sysId, String producerGroup, String consumerGro this.eventMeshTcpRetryer = eventMeshTCPServer.getEventMeshTcpRetryer(); this.eventMeshTcpMonitor = eventMeshTCPServer.getEventMeshTcpMonitor(); this.downstreamDispatchStrategy = downstreamDispatchStrategy; + this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType); + this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType); + this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshConnectorPluginType); } public ConcurrentHashMap> getTopic2sessionInGroupMapping() { @@ -163,8 +169,6 @@ public void onException(OnExceptionContext context) { return true; } - private MQProducerWrapper mqProducerWrapper = new MQProducerWrapper(); - public MQProducerWrapper getMqProducerWrapper() { return mqProducerWrapper; } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java index 7841a67eaa..61d3c487f1 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/consumer/LiteConsumer.java @@ -96,7 +96,7 @@ public LiteConsumer(LiteClientConfig liteClientConfig, // this.remotingServer = new RemotingServer(this.consumeExecutor); } - private AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); + private final AtomicBoolean started = new AtomicBoolean(Boolean.FALSE); @Override public void start() throws Exception { @@ -226,15 +226,14 @@ public void run() { EventMeshRetObj ret = JSON.parseObject(res, EventMeshRetObj.class); - if (ret.getRetCode() == EventMeshRetCode.SUCCESS.getRetCode()) { - } else { + if (ret.getRetCode() != EventMeshRetCode.SUCCESS.getRetCode()) { throw new EventMeshException(ret.getRetCode(), ret.getRetMsg()); } } catch (Exception e) { logger.error("send heartBeat error", e); } } - }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS); + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); } public boolean unsubscribe(List topicList, String url) throws Exception { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java index 65aa40e918..04c275d3d5 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/EventMeshCommon.java @@ -31,7 +31,7 @@ public class EventMeshCommon { /** * CLIENT端心跳间隔时间 */ - public static int HEATBEAT = 30 * 1000; + public static int HEARTBEAT = 30 * 1000; /** * RR 废弃清理的时间间隔 diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java index 7df1e29f30..5513e1310e 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/common/TcpClient.java @@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory; public abstract class TcpClient implements Closeable { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); public int clientNo = (new Random()).nextInt(1000); @@ -67,8 +67,6 @@ public abstract class TcpClient implements Closeable { protected static final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(4, new EventMeshThreadFactoryImpl("TCPClientScheduler", true)); - private ScheduledFuture task; - public TcpClient(String host, int port) { this.host = host; this.port = port; @@ -119,7 +117,7 @@ protected void send(Package msg) throws Exception { if (channel.isWritable()) { channel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { - logger.warn("send msg failed", future.isSuccess(), future.cause()); + logger.warn("send msg failed", future.cause()); } }); } else { diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java index da5691a79f..df05f4bb61 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimplePubClientImpl.java @@ -41,7 +41,7 @@ public class SimplePubClientImpl extends TcpClient implements SimplePubClient { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private UserAgent userAgent; @@ -90,10 +90,10 @@ public void run() { } Package msg = MessageUtils.heartBeat(); io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception e) { + } catch (Exception ignore) { } } - }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS); + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); } private void goodbye() throws Exception { @@ -176,16 +176,13 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep Package pkg = MessageUtils.responseToClientAck(msg); send(pkg); } else if (cmd == Command.SERVER_GOODBYE_REQUEST) { - + //TODO } RequestContext context = contexts.get(RequestContext._key(msg)); if (context != null) { contexts.remove(context.getKey()); context.finish(msg); - return; - } else { - return; } } } diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java index 38d52f653d..7e341cadd4 100644 --- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java +++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/tcp/impl/SimpleSubClientImpl.java @@ -44,7 +44,7 @@ public class SimpleSubClientImpl extends TcpClient implements SimpleSubClient { - private Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Logger logger = LoggerFactory.getLogger(this.getClass()); private UserAgent userAgent; @@ -101,10 +101,10 @@ public void run() { } Package msg = MessageUtils.heartBeat(); SimpleSubClientImpl.this.io(msg, EventMeshCommon.DEFAULT_TIME_OUT_MILLS); - } catch (Exception e) { + } catch (Exception ignore) { } } - }, EventMeshCommon.HEATBEAT, EventMeshCommon.HEATBEAT, TimeUnit.MILLISECONDS); + }, EventMeshCommon.HEARTBEAT, EventMeshCommon.HEARTBEAT, TimeUnit.MILLISECONDS); } private void goodbye() throws Exception { @@ -172,10 +172,8 @@ protected void channelRead0(ChannelHandlerContext ctx, Package msg) throws Excep if (context != null) { contexts.remove(context.getKey()); context.finish(msg); - return; } else { logger.error("msg ignored,context not found.|{}|{}", cmd, msg); - return; } } } diff --git a/eventmesh-spi/build.gradle b/eventmesh-spi/build.gradle new file mode 100644 index 0000000000..d973dcedae --- /dev/null +++ b/eventmesh-spi/build.gradle @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ \ No newline at end of file diff --git a/eventmesh-spi/gradle.properties b/eventmesh-spi/gradle.properties new file mode 100644 index 0000000000..d0503c3b7b --- /dev/null +++ b/eventmesh-spi/gradle.properties @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +group=org.apache.eventmesh +version=1.2.0-SNAPSHOT +jdk=1.8 +snapshot=false diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java new file mode 100644 index 0000000000..6aea9db11d --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; + +public enum EventMeshExtensionFactory { + ; + + public static T getExtension(Class extensionType, String extensionName) { + if (extensionType == null) { + throw new ExtensionException("extensionType is null"); + } + if (StringUtils.isEmpty(extensionName)) { + throw new ExtensionException("extensionName is null"); + } + if (!extensionType.isInterface() || !extensionType.isAnnotationPresent(EventMeshSPI.class)) { + throw new ExtensionException(String.format("extensionType:%s is invalided", extensionType)); + } + return EventMeshExtensionLoader.getExtension(extensionType, extensionName); + } +} diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java new file mode 100644 index 0000000000..89696e04da --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshExtensionLoader.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Enumeration; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +public enum EventMeshExtensionLoader { + ; + + private static final Logger logger = LoggerFactory.getLogger(EventMeshExtensionLoader.class); + + private static final ConcurrentHashMap, ConcurrentHashMap>> EXTENSION_CLASS_LOAD_CACHE = new ConcurrentHashMap<>(16); + + private static final ConcurrentHashMap EXTENSION_INSTANCE_CACHE = new ConcurrentHashMap<>(16); + + private static final String EVENTMESH_EXTENSION_DIR = "META-INF/eventmesh/"; + + @SuppressWarnings("unchecked") + public static T getExtension(Class extensionType, String extensionName) { + if (!hasLoadExtensionClass(extensionType)) { + loadExtensionClass(extensionType); + } + if (!hasInitializeExtension(extensionName)) { + initializeExtension(extensionType, extensionName); + } + return (T) EXTENSION_INSTANCE_CACHE.get(extensionName); + } + + private static void initializeExtension(Class extensionType, String extensionName) { + ConcurrentHashMap> extensionClassMap = EXTENSION_CLASS_LOAD_CACHE.get(extensionType); + if (extensionClassMap == null) { + throw new ExtensionException(String.format("Extension type:%s has not been loaded", extensionType)); + } + if (!extensionClassMap.containsKey(extensionName)) { + throw new ExtensionException(String.format("Extension name:%s has not been loaded", extensionName)); + } + Class aClass = extensionClassMap.get(extensionName); + try { + Object extensionObj = aClass.newInstance(); + logger.info("initialize extension instance success, extensionType: {}, extensionName: {}", extensionType, extensionName); + EXTENSION_INSTANCE_CACHE.put(extensionName, extensionObj); + } catch (InstantiationException | IllegalAccessException e) { + throw new ExtensionException("Extension initialize error", e); + } + } + + public static void loadExtensionClass(Class extensionType) { + String extensionFileName = EVENTMESH_EXTENSION_DIR + extensionType.getName(); + ClassLoader classLoader = EventMeshExtensionLoader.class.getClassLoader(); + try { + Enumeration extensionUrls = classLoader.getResources(extensionFileName); + if (extensionUrls != null) { + while (extensionUrls.hasMoreElements()) { + URL url = extensionUrls.nextElement(); + loadResources(url, extensionType); + } + } + } catch (IOException e) { + throw new ExtensionException("load extension class error", e); + } + + + } + + private static void loadResources(URL url, Class extensionType) throws IOException { + try (InputStream inputStream = url.openStream()) { + Properties properties = new Properties(); + properties.load(inputStream); + properties.forEach((extensionName, extensionClass) -> { + String extensionNameStr = (String) extensionName; + String extensionClassStr = (String) extensionClass; + try { + Class targetClass = Class.forName(extensionClassStr); + logger.info("load extension class success, extensionType: {}, extensionClass: {}", extensionType, targetClass); + if (!extensionType.isAssignableFrom(targetClass)) { + throw new ExtensionException( + String.format("class: %s is not subClass of %s", targetClass, extensionType)); + } + EXTENSION_CLASS_LOAD_CACHE.computeIfAbsent(extensionType, k -> new ConcurrentHashMap<>()) + .put(extensionNameStr, targetClass); + } catch (ClassNotFoundException e) { + throw new ExtensionException("load extension class error", e); + } + }); + } + } + + private static boolean hasLoadExtensionClass(Class extensionType) { + return EXTENSION_CLASS_LOAD_CACHE.containsKey(extensionType); + } + + private static boolean hasInitializeExtension(String extensionName) { + return EXTENSION_INSTANCE_CACHE.containsKey(extensionName); + } +} diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshSPI.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshSPI.java new file mode 100644 index 0000000000..0ea72d431b --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/EventMeshSPI.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Just as a marker for SPI + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface EventMeshSPI { + +} + diff --git a/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/ExtensionException.java b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/ExtensionException.java new file mode 100644 index 0000000000..874f03da5d --- /dev/null +++ b/eventmesh-spi/src/main/java/org/apache/eventmesh/spi/ExtensionException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +public class ExtensionException extends RuntimeException { + + public ExtensionException(Exception e) { + super(e); + } + + public ExtensionException(String message) { + super(message); + } + + public ExtensionException(String message, Exception e) { + super(message, e); + } +} diff --git a/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/EventMeshExtensionFactoryTest.java b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/EventMeshExtensionFactoryTest.java new file mode 100644 index 0000000000..649f4b18b2 --- /dev/null +++ b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/EventMeshExtensionFactoryTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +import org.junit.Test; + +public class EventMeshExtensionFactoryTest { + + @Test + public void getExtension() { + TestExtension extensionA = EventMeshExtensionFactory.getExtension(TestExtension.class, "extensionA"); + extensionA.hello(); + } +} \ No newline at end of file diff --git a/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/ExtensionA.java b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/ExtensionA.java new file mode 100644 index 0000000000..03513e6203 --- /dev/null +++ b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/ExtensionA.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +public class ExtensionA implements TestExtension { + + @Override + public void hello() { + System.out.println("I am ExtensionA"); + } +} diff --git a/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/TestExtension.java b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/TestExtension.java new file mode 100644 index 0000000000..c0c9888130 --- /dev/null +++ b/eventmesh-spi/src/test/java/org/apache/eventmesh/spi/TestExtension.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.spi; + +@EventMeshSPI +public interface TestExtension { + + void hello(); +} diff --git a/eventmesh-spi/src/test/resources/META-INF/eventmesh/org.apache.eventmesh.spi.TestExtension b/eventmesh-spi/src/test/resources/META-INF/eventmesh/org.apache.eventmesh.spi.TestExtension new file mode 100644 index 0000000000..3862ccbb4f --- /dev/null +++ b/eventmesh-spi/src/test/resources/META-INF/eventmesh/org.apache.eventmesh.spi.TestExtension @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +extensionA=org.apache.eventmesh.spi.ExtensionA \ No newline at end of file diff --git a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java index 9d3af8ffe4..329f2bc648 100644 --- a/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java +++ b/eventmesh-test/src/main/java/org/apache/eventmesh/http/demo/SyncRequestInstance.java @@ -34,10 +34,15 @@ public class SyncRequestInstance { public static void main(String[] args) throws Exception { LiteProducer liteProducer = null; + String eventMeshIPPort = "127.0.0.1:10105"; + String topic = "EventMesh.SyncRequestInstance"; try { - String eventMeshIPPort = args[0]; - - final String topic = args[1]; + if (args.length > 0 && StringUtils.isNotBlank(args[0])) { + eventMeshIPPort = args[0]; + } + if (args.length > 1 && StringUtils.isNotBlank(args[1])) { + topic = args[1]; + } if (StringUtils.isBlank(eventMeshIPPort)) { // if has multi value, can config as: 127.0.0.1:10105;127.0.0.2:10105 diff --git a/settings.gradle b/settings.gradle index ea31ffdc16..2b5e0af662 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,5 +17,12 @@ rootProject.name = 'EventMesh' String jdkVersion = "${jdk}" -include 'eventmesh-runtime','eventmesh-connector-rocketmq','eventmesh-sdk-java','eventmesh-common','eventmesh-connector-api','eventmesh-starter','eventmesh-test' +include 'eventmesh-runtime' +include 'eventmesh-connector-rocketmq' +include 'eventmesh-sdk-java' +include 'eventmesh-common' +include 'eventmesh-connector-api' +include 'eventmesh-starter' +include 'eventmesh-test' +include 'eventmesh-spi'