diff --git a/.asf.yaml b/.asf.yaml index 66dc6d7649..63fe6434db 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -37,8 +37,8 @@ github: - microservice - state-management enabled_merge_buttons: + merge: true squash: true - merge: false rebase: false protected_branches: master: diff --git a/README.md b/README.md index cce12ce9bc..9e27b02873 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ [![Total alerts](https://img.shields.io/lgtm/alerts/g/apache/incubator-eventmesh.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/apache/incubator-eventmesh/alerts/) [![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/incubator-eventmesh/releases) [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) +[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://join.slack.com/t/apacheeventmesh/shared_invite/zt-yx3n2ak7-HcVG98CDqb~7PwgoDzgfMA) [点我查看中文版](README.zh-CN.md) @@ -47,7 +48,7 @@ Event & Service - [ ] Event transaction - [ ] At-least-once/at-most-once delivery guarantees -Connector +Store Connector - [x] RocketMQ - [x] InMemory - [ ] Federated @@ -64,6 +65,8 @@ Protocol - [ ] gRPC - [ ] CloudEvents - [ ] MQTT +- [ ] WebSocket +- [ ] AMQP - [ ] AsyncAPI SDK @@ -90,7 +93,7 @@ Governance - [x] Client management - [ ] Topic management - [ ] Metadata registry -- [x] Schema registry +- [ ] Schema registry - [ ] Dynamic config Choreography @@ -105,7 +108,7 @@ Runtime - [ ] WebAssembly runtime ## Quick Start -1. [Connector quickstart](https://rocketmq.apache.org/docs/quick-start/) (RocketMQ, ignore this step if use standalone). +1. [Store quickstart](docs/en/instructions/eventmesh-store-quickstart.md) 2. [Runtime quickstart](docs/en/instructions/eventmesh-runtime-quickstart.md) or [Runtime quickstart with docker](docs/en/instructions/eventmesh-runtime-quickstart-with-docker.md). 3. [Java SDK examples](docs/en/instructions/eventmesh-sdk-java-quickstart.md). @@ -127,13 +130,9 @@ EventMesh enriches the byte[] serialize(String topic, Class data) throws JsonProcessingException { + if (data == null) { + return null; + } + return objectMapper.writeValueAsBytes(data); + } + + public static String toJson(Object obj) throws JsonProcessingException { + if (obj == null) { + return null; + } + return objectMapper.writeValueAsString(obj); + } + + public static T toObject(String json, Class clazz) throws JsonProcessingException { + return objectMapper.readValue(json, clazz); + } + + public static T deserialize(Class clazz, byte[] bytes) throws IOException { + if (bytes == null || bytes.length == 0) { + return null; + } + + return objectMapper.readValue(bytes, clazz); + } + + public static T deserialize(Class clazz, String json) throws IOException { + if (json == null || json.length() == 0) { + return null; + } + + return objectMapper.readValue(json, clazz); + } +} diff --git a/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/NetUtils.java b/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/NetUtils.java new file mode 100644 index 0000000000..c6c7992d2e --- /dev/null +++ b/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/NetUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.rocketmq.util; + +import org.apache.http.Consts; + +import java.io.IOException; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.sun.net.httpserver.HttpExchange; + +public class NetUtils { + + private static final Logger logger = LoggerFactory.getLogger(NetUtils.class); + + public static String parsePostBody(HttpExchange exchange) + throws IOException { + StringBuilder body = new StringBuilder(); + if ("post".equalsIgnoreCase(exchange.getRequestMethod()) + || "put".equalsIgnoreCase(exchange.getRequestMethod())) { + try (InputStreamReader reader = + new InputStreamReader(exchange.getRequestBody(), Consts.UTF_8)) { + char[] buffer = new char[256]; + int read; + while ((read = reader.read(buffer)) != -1) { + body.append(buffer, 0, read); + } + } + } + return body.toString(); + } +} + diff --git a/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/RequestMapping.java b/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/RequestMapping.java new file mode 100644 index 0000000000..b093d3e9f3 --- /dev/null +++ b/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/RequestMapping.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.rocketmq.util; + +import com.sun.net.httpserver.HttpExchange; + +public class RequestMapping { + + public static boolean postMapping(String value, HttpExchange httpExchange) { + if ("post".equalsIgnoreCase(httpExchange.getRequestMethod())) { + String requestUri = httpExchange.getRequestURI().getPath(); + UrlMappingPattern matcher = new UrlMappingPattern(value); + return matcher.matches(requestUri); + } + return false; + } + + public static boolean getMapping(String value, HttpExchange httpExchange) { + if ("get".equalsIgnoreCase(httpExchange.getRequestMethod())) { + String requestUri = httpExchange.getRequestURI().getPath(); + UrlMappingPattern matcher = new UrlMappingPattern(value); + return matcher.matches(requestUri); + } + return false; + } + + public static boolean putMapping(String value, HttpExchange httpExchange) { + if ("put".equalsIgnoreCase(httpExchange.getRequestMethod())) { + String requestUri = httpExchange.getRequestURI().getPath(); + UrlMappingPattern matcher = new UrlMappingPattern(value); + return matcher.matches(requestUri); + } + return false; + } + + public static boolean deleteMapping(String value, HttpExchange httpExchange) { + if ("delete".equalsIgnoreCase(httpExchange.getRequestMethod())) { + String requestUri = httpExchange.getRequestURI().getPath(); + UrlMappingPattern matcher = new UrlMappingPattern(value); + return matcher.matches(requestUri); + } + return false; + } + +} diff --git a/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/UrlMappingPattern.java b/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/UrlMappingPattern.java new file mode 100644 index 0000000000..0e31b13ca1 --- /dev/null +++ b/eventmesh-admin/eventmesh-admin-rocketmq/src/main/java/org/apache/eventmesh/admin/rocketmq/util/UrlMappingPattern.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.admin.rocketmq.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class UrlMappingPattern { + + private static final String URL_PARAMETER_REGEX = "\\{(\\w*?)\\}"; + + private static final String URL_PARAMETER_MATCH_REGEX = + "\\([%\\\\w-.\\\\~!\\$&'\\\\(\\\\)\\\\*\\\\+,;=:\\\\[\\\\]@]+?\\)"; + + private static final Pattern URL_PARAMETER_PATTERN = Pattern.compile(URL_PARAMETER_REGEX); + + private static final String URL_FORMAT_REGEX = "(?:\\.\\{format\\})$"; + + private static final String URL_FORMAT_MATCH_REGEX = "(?:\\\\.\\([\\\\w%]+?\\))?"; + + private static final String URL_QUERY_STRING_REGEX = "(?:\\?.*?)?$"; + + private String urlMappingPattern; + + private Pattern compiledUrlMappingPattern; + + private List paramNames = new ArrayList(); + + public UrlMappingPattern(String pattern) { + super(); + setUrlMappingPattern(pattern); + compile(); + } + + public String getMappingPattern() { + return getUrlMappingPattern().replaceFirst(URL_FORMAT_REGEX, ""); + } + + private String getUrlMappingPattern() { + return urlMappingPattern; + } + + public Map extractPathParameterValues(String url) { + Matcher matcher = compiledUrlMappingPattern.matcher(url); + if (matcher.matches()) { + return extractParameters(matcher); + } + return null; + } + + public boolean matches(String url) { + return (extractPathParameterValues(url) != null); + } + + public void compile() { + acquireParamNames(); + String parsedPattern = + getUrlMappingPattern().replaceFirst(URL_FORMAT_REGEX, URL_FORMAT_MATCH_REGEX); + parsedPattern = parsedPattern.replaceAll(URL_PARAMETER_REGEX, URL_PARAMETER_MATCH_REGEX); + this.compiledUrlMappingPattern = Pattern.compile(parsedPattern + URL_QUERY_STRING_REGEX); + } + + private void acquireParamNames() { + Matcher m = URL_PARAMETER_PATTERN.matcher(getUrlMappingPattern()); + while (m.find()) { + paramNames.add(m.group(1)); + } + } + + private Map extractParameters(Matcher matcher) { + Map values = new HashMap(); + for (int i = 0; i < matcher.groupCount(); i++) { + String value = matcher.group(i + 1); + + if (value != null) { + values.put(paramNames.get(i), value); + } + } + return values; + } + + private void setUrlMappingPattern(String pattern) { + this.urlMappingPattern = pattern; + } + + public List getParamNames() { + return Collections.unmodifiableList(paramNames); + } +} diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle index 699e9c4fdb..715ae91462 100644 --- a/eventmesh-common/build.gradle +++ b/eventmesh-common/build.gradle @@ -16,21 +16,17 @@ */ dependencies { - api "org.apache.commons:commons-lang3" - api "org.apache.commons:commons-collections4" api "com.google.guava:guava" api "org.slf4j:slf4j-api" api "junit:junit" api "org.assertj:assertj-core" - - implementation "commons-io:commons-io" - implementation "org.apache.commons:commons-text" + api "org.apache.commons:commons-collections4" + api "org.apache.commons:commons-text" + api "org.apache.commons:commons-lang3" implementation "org.apache.logging.log4j:log4j-api" implementation "org.apache.logging.log4j:log4j-core" - implementation "org.apache.logging.log4j:log4j-core" implementation "org.apache.logging.log4j:log4j-slf4j-impl" - implementation "org.apache.logging.log4j:log4j-web" implementation "com.lmax:disruptor" @@ -50,6 +46,25 @@ dependencies { testCompileOnly 'org.projectlombok:lombok:1.18.22' testAnnotationProcessor 'org.projectlombok:lombok:1.18.22' + testImplementation "org.apache.commons:commons-lang3" + + testImplementation "com.google.guava:guava" + + testImplementation "org.slf4j:slf4j-api" + testImplementation "org.apache.logging.log4j:log4j-api" + testImplementation "org.apache.logging.log4j:log4j-core" + testImplementation "org.apache.logging.log4j:log4j-slf4j-impl" + + testImplementation "com.lmax:disruptor" + + testImplementation "com.fasterxml.jackson.core:jackson-databind" + testImplementation "com.fasterxml.jackson.core:jackson-core" + testImplementation "com.fasterxml.jackson.core:jackson-annotations" + + testImplementation "org.apache.httpcomponents:httpclient" + + testImplementation "io.netty:netty-all" + testImplementation "junit:junit" testImplementation "org.assertj:assertj-core" diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java new file mode 100644 index 0000000000..82ca6a514e --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/Command.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.admin.command; + +import org.apache.eventmesh.connector.rocketmq.common.EventMeshConstants; +import org.apache.eventmesh.connector.rocketmq.config.ClientConfiguration; +import org.apache.eventmesh.connector.rocketmq.config.ConfigurationWrapper; + +import org.apache.rocketmq.acl.common.AclClientRPCHook; +import org.apache.rocketmq.acl.common.SessionCredentials; +import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; + +import java.io.File; +import java.util.UUID; + +public abstract class Command { + protected DefaultMQAdminExt adminExt; + + protected String nameServerAddr; + protected String clusterName; + + public void init() { + ConfigurationWrapper configurationWrapper = + new ConfigurationWrapper(EventMeshConstants.EVENTMESH_CONF_HOME + + File.separator + + EventMeshConstants.EVENTMESH_CONF_FILE, false); + final ClientConfiguration clientConfiguration = + new ClientConfiguration(configurationWrapper); + clientConfiguration.init(); + + nameServerAddr = clientConfiguration.namesrvAddr; + clusterName = clientConfiguration.clusterName; + String accessKey = clientConfiguration.accessKey; + String secretKey = clientConfiguration.secretKey; + + RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)); + adminExt = new DefaultMQAdminExt(rpcHook); + String groupId = UUID.randomUUID().toString(); + adminExt.setAdminExtGroup("admin_ext_group-" + groupId); + adminExt.setNamesrvAddr(nameServerAddr); + } + + public abstract void execute() throws Exception; +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java new file mode 100644 index 0000000000..53f1822560 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/admin/command/CreateTopicCommand.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.connector.rocketmq.admin.command; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.tools.command.CommandUtil; + +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CreateTopicCommand extends Command { + public Logger logger = LoggerFactory.getLogger(this.getClass()); + + private int numOfQueue = 4; + private int queuePermission = 6; + private String topicName = ""; + + @Override + public void execute() throws Exception { + if (StringUtils.isBlank(topicName)) { + logger.error("Topic name can not be blank."); + throw new Exception("Topic name can not be blank."); + } + try { + init(); + adminExt.start(); + Set brokersAddr = CommandUtil.fetchMasterAddrByClusterName( + adminExt, clusterName); + for (String masterAddr : brokersAddr) { + TopicConfig topicConfig = new TopicConfig(); + topicConfig.setTopicName(topicName); + topicConfig.setReadQueueNums(numOfQueue); + topicConfig.setWriteQueueNums(numOfQueue); + topicConfig.setPerm(queuePermission); + adminExt.createAndUpdateTopicConfig(masterAddr, topicConfig); + logger.info("Topic {} is created for RocketMQ broker {}", topicName, masterAddr); + } + } finally { + adminExt.shutdown(); + } + } + + public int getNumOfQueue() { + return numOfQueue; + } + + public int getQueuePermission() { + return queuePermission; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public void setNumOfQueue(int numOfQueue) { + this.numOfQueue = numOfQueue; + } + + public void setQueuePermission(int permission) { + this.queuePermission = permission; + } +} diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java index f1b9335bbe..feadca6319 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/java/org/apache/eventmesh/connector/rocketmq/config/ClientConfiguration.java @@ -36,6 +36,9 @@ public class ClientConfiguration { public Integer pollNameServerInterval = 10 * 1000; public Integer heartbeatBrokerInterval = 30 * 1000; public Integer rebalanceInterval = 20 * 1000; + public String clusterName = ""; + public String accessKey = ""; + public String secretKey = ""; protected ConfigurationWrapper configurationWrapper; @@ -101,13 +104,17 @@ public void init() { pullBatchSize = Integer.valueOf(clientPullBatchSizeStr); } - String clientPollNamesrvIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); + String clientPollNamesrvIntervalStr = + configurationWrapper.getProp( + ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL); if (StringUtils.isNotEmpty(clientPollNamesrvIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientPollNamesrvIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_POLL_NAMESRV_INTERVAL)); pollNameServerInterval = Integer.valueOf(clientPollNamesrvIntervalStr); } - String clientHeartbeatBrokerIntervalStr = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); + String clientHeartbeatBrokerIntervalStr = + configurationWrapper.getProp( + ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL); if (StringUtils.isNotEmpty(clientHeartbeatBrokerIntervalStr)) { Preconditions.checkState(StringUtils.isNumeric(clientHeartbeatBrokerIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL)); heartbeatBrokerInterval = Integer.valueOf(clientHeartbeatBrokerIntervalStr); @@ -118,6 +125,21 @@ public void init() { Preconditions.checkState(StringUtils.isNumeric(clientRebalanceIntervalIntervalStr), String.format("%s error", ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL)); rebalanceInterval = Integer.valueOf(clientRebalanceIntervalIntervalStr); } + + String cluster = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_CLUSTER); + if (StringUtils.isNotBlank(cluster)) { + clusterName = cluster; + } + + String ak = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY); + if (StringUtils.isNotBlank(ak)) { + accessKey = ak; + } + + String sk = configurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY); + if (StringUtils.isNotBlank(sk)) { + secretKey = sk; + } } static class ConfKeys { @@ -147,6 +169,14 @@ static class ConfKeys { public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_HEARTBEAT_BROKER_INTERVAL = "eventMesh.server.rocketmq.client.heartbeatBrokerInterval"; public static String KEYS_EVENTMESH_ROCKETMQ_CLIENT_REBALANCE_INTERVAL = "eventMesh.server.rocketmq.client.rebalanceInterval"; + + public static String KEYS_EVENTMESH_ROCKETMQ_CLUSTER = "eventMesh.server.rocketmq.cluster"; + + public static String KEYS_EVENTMESH_ROCKETMQ_ACCESS_KEY = + "eventMesh.server.rocketmq.accessKey"; + + public static String KEYS_EVENTMESH_ROCKETMQ_SECRET_KEY = + "eventMesh.server.rocketmq.secretKey"; } } \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties index 1261f30e2c..8e0822ad36 100644 --- a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/main/resources/rocketmq-client.properties @@ -16,3 +16,6 @@ # #######################rocketmq-client################## eventMesh.server.rocketmq.namesrvAddr=127.0.0.1:9876;127.0.0.1:9876 +eventMesh.server.rocketmq.cluster=DefaultCluster +eventMesh.server.rocketmq.accessKey=******** +eventMesh.server.rocketmq.secretKey=******** diff --git a/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java new file mode 100644 index 0000000000..16ff257b79 --- /dev/null +++ b/eventmesh-connector-plugin/eventmesh-connector-rocketmq/src/test/java/org/apache/eventmesh/producer/DefaultProducerImplTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.producer; + + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.eventmesh.api.producer.MeshMQProducer; +import org.apache.eventmesh.connector.rocketmq.producer.RocketMQProducerImpl; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.openmessaging.api.exception.OMSRuntimeException; + +public class DefaultProducerImplTest { + + @Before + public void before() {} + + + @After + public void after() { + //TBD:Remove topic + } + + @Test + public void testCreate_EmptyTopic() { + MeshMQProducer meshPub = new RocketMQProducerImpl(); + try { + meshPub.createTopic(" "); + } catch (OMSRuntimeException e) { + assertThat(e.getMessage()).isEqualToIgnoringWhitespace("RocketMQ can not create topic"); + } + } + + @Test + public void testCreate_NullTopic() { + MeshMQProducer meshPub = new RocketMQProducerImpl(); + try { + meshPub.createTopic(null); + } catch (OMSRuntimeException e) { + String errorMessage = e.getMessage(); + assertThat(errorMessage).isEqualTo("RocketMQ can not create topic null"); + } + } +} \ No newline at end of file diff --git a/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneMeshMQProducerAdaptor.java b/eventmesh-connector-plugin/eventmesh-connector-standalone/src/main/java/org/apache/eventmesh/connector/standalone/producer/StandaloneMeshMQProducerAdaptor.java new file mode 100644 index 0000000000..e69de29bb2 diff --git a/eventmesh-runtime/build.gradle b/eventmesh-runtime/build.gradle index 5bf0df28cb..e4e19031a7 100644 --- a/eventmesh-runtime/build.gradle +++ b/eventmesh-runtime/build.gradle @@ -35,13 +35,30 @@ dependencies { implementation project(":eventmesh-security-plugin:eventmesh-security-acl") implementation project(":eventmesh-registry-plugin:eventmesh-registry-api") implementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") + implementation project(":eventmesh-admin:eventmesh-admin-rocketmq") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") + // for debug only, can be removed implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-cloudevents") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-meshmessage") implementation project(":eventmesh-protocol-plugin:eventmesh-protocol-openmessage") + testImplementation project(":eventmesh-common") + testImplementation project(":eventmesh-spi") + testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api") + testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-standalone") + testImplementation project(":eventmesh-security-plugin:eventmesh-security-api") + testImplementation project(":eventmesh-security-plugin:eventmesh-security-acl") + testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-api") + testImplementation project(":eventmesh-registry-plugin:eventmesh-registry-namesrv") + testImplementation project(":eventmesh-admin:eventmesh-admin-rocketmq") + + testImplementation project(":eventmesh-protocol-plugin:eventmesh-protocol-api") + + testImplementation "org.apache.httpcomponents:httpclient" + testImplementation "io.netty:netty-all" + testImplementation "org.mockito:mockito-core" testImplementation "org.powermock:powermock-module-junit4" testImplementation "org.powermock:powermock-api-mockito2" diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java index cd6d094db5..3fc2e8e2b8 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/admin/controller/ClientManageController.java @@ -17,21 +17,34 @@ package org.apache.eventmesh.runtime.admin.controller; +import org.apache.eventmesh.admin.rocketmq.controller.AdminController; +import org.apache.eventmesh.runtime.admin.handler.QueryRecommendEventMeshHandler; +import org.apache.eventmesh.runtime.admin.handler.RedirectClientByIpPortHandler; +import org.apache.eventmesh.runtime.admin.handler.RedirectClientByPathHandler; +import org.apache.eventmesh.runtime.admin.handler.RedirectClientBySubSystemHandler; +import org.apache.eventmesh.runtime.admin.handler.RejectAllClientHandler; +import org.apache.eventmesh.runtime.admin.handler.RejectClientByIpPortHandler; +import org.apache.eventmesh.runtime.admin.handler.RejectClientBySubSystemHandler; +import org.apache.eventmesh.runtime.admin.handler.ShowClientBySystemHandler; +import org.apache.eventmesh.runtime.admin.handler.ShowClientHandler; +import org.apache.eventmesh.runtime.admin.handler.ShowListenClientByTopicHandler; +import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; + import java.io.IOException; import java.net.InetSocketAddress; -import com.sun.net.httpserver.HttpServer; - -import org.apache.eventmesh.runtime.admin.handler.*; -import org.apache.eventmesh.runtime.boot.EventMeshTCPServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.sun.net.httpserver.HttpServer; + public class ClientManageController { private static final Logger logger = LoggerFactory.getLogger(ClientManageController.class); private EventMeshTCPServer eventMeshTCPServer; + + private AdminController adminController; public ClientManageController(EventMeshTCPServer eventMeshTCPServer) { this.eventMeshTCPServer = eventMeshTCPServer; @@ -50,6 +63,10 @@ public void start() throws IOException { server.createContext("/clientManage/redirectClientByIpPort", new RedirectClientByIpPortHandler(eventMeshTCPServer)); server.createContext("/clientManage/showListenClientByTopic", new ShowListenClientByTopicHandler(eventMeshTCPServer)); server.createContext("/eventMesh/recommend", new QueryRecommendEventMeshHandler(eventMeshTCPServer)); + + adminController = new AdminController(); + adminController.run(server); + server.start(); logger.info("ClientManageController start success, port:{}", port); } diff --git a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java index 02e1c1ec69..0c75e7ab03 100644 --- a/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java +++ b/eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java @@ -70,7 +70,7 @@ public SubscribeProcessor(EventMeshHTTPServer eventMeshHTTPServer) { @Override public void processRequest(ChannelHandlerContext ctx, AsyncContext asyncContext) - throws Exception { + throws Exception { HttpCommand responseEventMeshCommand; final HttpCommand request = asyncContext.getRequest(); final Integer requestCode = Integer.valueOf(asyncContext.getRequest().getRequestCode()); @@ -107,8 +107,8 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext //validate body if (StringUtils.isBlank(subscribeRequestBody.getUrl()) - || CollectionUtils.isEmpty(subscribeRequestBody.getTopics()) - || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) { + || CollectionUtils.isEmpty(subscribeRequestBody.getTopics()) + || StringUtils.isBlank(subscribeRequestBody.getConsumerGroup())) { responseEventMeshCommand = request.createHttpCommandResponse( subscribeResponseHeader, @@ -129,17 +129,17 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext for (SubscriptionItem item : subTopicList) { try { Acl.doAclCheckInHttpReceive(remoteAddr, user, pass, subsystem, item.getTopic(), - requestCode); + requestCode); } catch (Exception e) { responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse( - subscribeResponseHeader, - SendMessageResponseBody - .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), - e.getMessage())); + subscribeResponseHeader, + SendMessageResponseBody + .buildBody(EventMeshRetCode.EVENTMESH_ACL_ERR.getRetCode(), + e.getMessage())); asyncContext.onComplete(responseEventMeshCommand); aclLogger - .warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e); + .warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e); return; } } @@ -154,7 +154,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext for (SubscriptionItem subTopic : subTopicList) { List groupTopicClients = eventMeshHTTPServer.localClientInfoMapping - .get(consumerGroup + "@" + subTopic.getTopic()); + .get(consumerGroup + "@" + subTopic.getTopic()); if (CollectionUtils.isEmpty(groupTopicClients)) { httpLogger.error("group {} topic {} clients is empty", consumerGroup, subTopic); @@ -171,7 +171,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } } ConsumerGroupConf consumerGroupConf = - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); + eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup); if (consumerGroupConf == null) { // new subscription consumerGroupConf = new ConsumerGroupConf(consumerGroup); @@ -189,7 +189,17 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext } else { // already subscribed Map map = - consumerGroupConf.getConsumerGroupTopicConf(); + consumerGroupConf.getConsumerGroupTopicConf(); + if (!map.containsKey(subTopic.getTopic())) { + //If there are multiple topics, append it + ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); + newTopicConf.setConsumerGroup(consumerGroup); + newTopicConf.setTopic(subTopic.getTopic()); + newTopicConf.setSubscriptionItem(subTopic); + newTopicConf.setUrls(new HashSet<>(Arrays.asList(url))); + newTopicConf.setIdcUrls(idcUrls); + map.put(subTopic.getTopic(), newTopicConf); + } for (String key : map.keySet()) { if (StringUtils.equals(subTopic.getTopic(), key)) { ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf(); @@ -203,15 +213,6 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext latestTopicConf.setIdcUrls(idcUrls); map.put(key, latestTopicConf); - } else { - //If there are multiple topics, append it - ConsumerGroupTopicConf newTopicConf = new ConsumerGroupTopicConf(); - newTopicConf.setConsumerGroup(consumerGroup); - newTopicConf.setTopic(subTopic.getTopic()); - newTopicConf.setSubscriptionItem(subTopic); - newTopicConf.setUrls(new HashSet<>(Arrays.asList(url))); - newTopicConf.setIdcUrls(idcUrls); - map.put(subTopic.getTopic(), newTopicConf); } } } @@ -222,7 +223,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext try { // subscription relationship change notification eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, - eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup)); + eventMeshHTTPServer.localConsumerGroupMapping.get(consumerGroup)); final CompleteHandler handler = new CompleteHandler() { @Override @@ -244,18 +245,18 @@ public void onResponse(HttpCommand httpCommand) { asyncContext.onComplete(responseEventMeshCommand, handler); } catch (Exception e) { HttpCommand err = asyncContext.getRequest().createHttpCommandResponse( - subscribeResponseHeader, - SubscribeResponseBody - .buildBody(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getRetCode(), - EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() - + EventMeshUtil.stackTrace(e, 2))); + subscribeResponseHeader, + SubscribeResponseBody + .buildBody(EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getRetCode(), + EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + + EventMeshUtil.stackTrace(e, 2))); asyncContext.onComplete(err); long endTime = System.currentTimeMillis(); httpLogger.error( - "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}" - + "|bizSeqNo={}|uniqueId={}", endTime - startTime, - JsonUtils.serialize(subscribeRequestBody.getTopics()), - subscribeRequestBody.getUrl(), e); + "message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}" + + "|bizSeqNo={}|uniqueId={}", endTime - startTime, + JsonUtils.serialize(subscribeRequestBody.getTopics()), + subscribeRequestBody.getUrl(), e); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgFailed(); eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsgCost(endTime - startTime); } @@ -285,7 +286,7 @@ private void registerClient(SubscribeRequestHeader subscribeRequestHeader, Strin if (eventMeshHTTPServer.localClientInfoMapping.containsKey(groupTopicKey)) { List localClients = - eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); + eventMeshHTTPServer.localClientInfoMapping.get(groupTopicKey); boolean isContains = false; for (Client localClient : localClients) { if (StringUtils.equals(localClient.url, client.url)) { diff --git a/eventmesh-sdk-java/build.gradle b/eventmesh-sdk-java/build.gradle index a60dc141b8..7b91767b06 100644 --- a/eventmesh-sdk-java/build.gradle +++ b/eventmesh-sdk-java/build.gradle @@ -21,6 +21,7 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind" implementation "com.fasterxml.jackson.core:jackson-core" implementation "com.fasterxml.jackson.core:jackson-annotations" + implementation "org.apache.commons:commons-collections4" implementation "io.netty:netty-all" implementation "org.apache.httpcomponents:httpclient" @@ -30,6 +31,15 @@ dependencies { implementation "io.cloudevents:cloudevents-json-jackson" implementation "io.openmessaging:openmessaging-api" + testImplementation project(":eventmesh-common") + + testImplementation "com.fasterxml.jackson.core:jackson-databind" + testImplementation "com.fasterxml.jackson.core:jackson-core" + testImplementation "com.fasterxml.jackson.core:jackson-annotations" + + testImplementation "io.netty:netty-all" + testImplementation "org.apache.httpcomponents:httpclient" + compileOnly 'org.projectlombok:lombok:1.18.22' annotationProcessor 'org.projectlombok:lombok:1.18.22' diff --git a/eventmesh-spi/build.gradle b/eventmesh-spi/build.gradle index f5933ae0f1..0911a87ff9 100644 --- a/eventmesh-spi/build.gradle +++ b/eventmesh-spi/build.gradle @@ -16,4 +16,6 @@ */ dependencies { compileOnly project(":eventmesh-common") + implementation "org.apache.commons:commons-collections4" + testImplementation project(":eventmesh-common") } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 73dc286f04..d51e958500 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,3 +30,5 @@ apacheUserName= #apache password apachePassWord= signEnabled=false + +org.gradle.warning.mode=none diff --git a/settings.gradle b/settings.gradle index 0606b2e0d6..fa13c1efa6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,7 +30,6 @@ include 'eventmesh-security-plugin:eventmesh-security-api' include 'eventmesh-security-plugin:eventmesh-security-acl' include 'eventmesh-registry-plugin:eventmesh-registry-api' include 'eventmesh-registry-plugin:eventmesh-registry-namesrv' -include 'eventmesh-admin' include 'eventmesh-schema-registry' include 'eventmesh-schema-registry:eventmesh-schema-registry-server' include 'eventmesh-protocol-plugin' @@ -38,4 +37,5 @@ include 'eventmesh-protocol-plugin:eventmesh-protocol-api' include 'eventmesh-protocol-plugin:eventmesh-protocol-openmessage' include 'eventmesh-protocol-plugin:eventmesh-protocol-cloudevents' include 'eventmesh-protocol-plugin:eventmesh-protocol-meshmessage' +include 'eventmesh-admin:eventmesh-admin-rocketmq' diff --git a/style/checkStyle.xml b/style/checkStyle.xml index 842f960a01..17f5d40e7f 100644 --- a/style/checkStyle.xml +++ b/style/checkStyle.xml @@ -54,7 +54,7 @@ - + @@ -73,6 +73,7 @@ + @@ -262,13 +264,15 @@ + + - + - - - + + + + + - +