Skip to content

Commit

Permalink
Merge pull request #5 from apache/master
Browse files Browse the repository at this point in the history
update
  • Loading branch information
Alonexc authored Mar 15, 2023
2 parents f24e9a8 + 1c18672 commit 0af4f3e
Show file tree
Hide file tree
Showing 34 changed files with 1,556 additions and 29 deletions.
16 changes: 8 additions & 8 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ github:
squash: true
merge: true
rebase: false
notifications:
protected_branches:
master:
required_status_checks:
strict: true
required_pull_request_reviews:
dismiss_stale_reviews: true
required_approving_review_count: 1
notifications:
commits: [email protected]
# Send all issue emails (new, closed, comments) to issues@
issues: [email protected]
Expand All @@ -51,10 +58,3 @@ github:
# Send individual PR comments/reviews to issues@
pullrequests_comment: [email protected]
jira_options: link label worklog
protected_branches:
master:
required_status_checks:
strict: true
required_pull_request_reviews:
dismiss_stale_reviews: true
required_approving_review_count: 1
1 change: 1 addition & 0 deletions .github/workflows/greetings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ jobs:
|Users |User support and questions mailing list| [Subscribe](mailto:[email protected]) |[Unsubscribe](mailto:[email protected]) |[Mail Archives](https://lists.apache.org/[email protected])|
|Development |Development related discussions| [Subscribe](mailto:[email protected]) |[Unsubscribe](mailto:[email protected]) |[Mail Archives](https://lists.apache.org/[email protected])|
|Commits |All commits to repositories| [Subscribe](mailto:[email protected]) |[Unsubscribe](mailto:[email protected]) |[Mail Archives](https://lists.apache.org/[email protected])|
|Issues |Issues or PRs comments and reviews| [Subscribe](mailto:[email protected]) |[Unsubscribe](mailto:[email protected]) |[Mail Archives](https://lists.apache.org/[email protected])|
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

# Apache EventMesh (Incubating)

**Apache EventMesh (Incubating)** is a fully serverless platform used to build distributed [event-driven](https://en.wikipedia.org/wiki/Event-driven_architecture) applications.
**Apache EventMesh (Incubating)** is a new generation serverless event middleware for building distributed [event-driven](https://en.wikipedia.org/wiki/Event-driven_architecture) applications.

### EventMesh Architecture

Expand Down Expand Up @@ -110,3 +110,4 @@ Apache EventMesh (Incubating) is licensed under the [Apache License, Version 2.0
|Users|User discussion|[Subscribe](mailto:[email protected])|[Unsubscribe](mailto:[email protected])|[Mail Archives](https://lists.apache.org/[email protected])|
|Development|Development discussion (Design Documents, Issues, etc.)|[Subscribe](mailto:[email protected])|[Unsubscribe](mailto:[email protected])|[Mail Archives](https://lists.apache.org/[email protected])|
|Commits|Commits to related repositories| [Subscribe](mailto:[email protected]) |[Unsubscribe](mailto:[email protected]) |[Mail Archives](https://lists.apache.org/[email protected])|
|Issues|Issues or PRs comments and reviews| [Subscribe](mailto:[email protected]) |[Unsubscribe](mailto:[email protected]) |[Mail Archives](https://lists.apache.org/[email protected])|
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@

package org.apache.eventmesh.common.config;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.convert.Convert;

import org.apache.commons.lang3.StringUtils;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.Properties;

Expand Down Expand Up @@ -68,9 +70,15 @@ class PropertiesFileLoad implements FileLoad {
public <T> T getConfig(ConfigInfo configInfo) throws IOException {
final Properties properties = new Properties();
if (StringUtils.isNotBlank(configInfo.getResourceUrl())) {
properties.load(new BufferedReader(new InputStreamReader(getClass().getResourceAsStream(configInfo.getResourceUrl()))));
try (BufferedReader reader = new BufferedReader(new InputStreamReader(
Objects.requireNonNull(getClass().getResourceAsStream(configInfo.getResourceUrl())), Constants.DEFAULT_CHARSET))) {
properties.load(reader);
}
} else {
properties.load(new BufferedReader(new FileReader(configInfo.getFilePath())));
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(Files.newInputStream(Paths.get(configInfo.getFilePath())), Constants.DEFAULT_CHARSET))) {
properties.load(reader);
}
}

if (Objects.isNull(configInfo.getClazz())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void handle(HttpExchange httpExchange) throws IOException {
if (!clientGroupMap.isEmpty()) {
for (ClientGroupWrapper cgw : clientGroupMap.values()) {
Set<Session> listenSessionSet = cgw.getTopic2sessionInGroupMapping().get(topic);
if (listenSessionSet != null && listenSessionSet.isEmpty()) {
if (listenSessionSet != null && !listenSessionSet.isEmpty()) {
result.append(String.format("group:%s", cgw.getGroup())).append(newLine);
for (Session session : listenSessionSet) {
UserAgent userAgent = session.getClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand>
eventSize = Integer.parseInt(Objects.requireNonNull(event.getExtension(SendMessageBatchRequestBody.SIZE)).toString());
CloudEventData eventData = event.getData();

if (eventData != null || StringUtils.isBlank(batchId)
if (eventData == null || StringUtils.isBlank(batchId)
|| StringUtils.isBlank(producerGroup)
|| eventSize != eventList.size()) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
@Slf4j
public class ProducerTopicManager {

private EventMeshServer eventMeshServer;
private final EventMeshServer eventMeshServer;

private transient ScheduledFuture<?> scheduledTask;

protected static ScheduledExecutorService scheduler;

private ConcurrentHashMap<String, EventMeshServicePubTopicInfo> eventMeshServicePubTopicInfoMap = new ConcurrentHashMap<>(64);
private final ConcurrentHashMap<String, EventMeshServicePubTopicInfo> eventMeshServicePubTopicInfoMap = new ConcurrentHashMap<>(64);

public ProducerTopicManager(EventMeshServer eventMeshServer) {
this.eventMeshServer = eventMeshServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class Session {

private SessionSender sender;

private long createTime = System.currentTimeMillis();
private final long createTime = System.currentTimeMillis();

private long lastHeartbeatTime = System.currentTimeMillis();

Expand All @@ -85,7 +85,7 @@ public class Session {

private boolean listenRspSend = false;

private ReentrantLock listenRspLock = new ReentrantLock();
private final ReentrantLock listenRspLock = new ReentrantLock();

private String listenRequestSeq = null;

Expand Down Expand Up @@ -326,19 +326,17 @@ public void setEventMeshTCPConfiguration(EventMeshTCPConfiguration eventMeshTCPC

public void trySendListenResponse(Header header, long startTime, long taskExecuteTime) {
if (!listenRspSend && listenRspLock.tryLock()) {
if (!listenRspSend) {
if (header == null) {
header = new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), "succeed", null);
}
Package msg = new Package();
msg.setHeader(header);

// TODO: if startTime is modified
Utils.writeAndFlush(msg, startTime, taskExecuteTime, context, this);
listenRspSend = true;
if (header == null) {
header = new Header(LISTEN_RESPONSE, OPStatus.SUCCESS.getCode(), "succeed", null);
}
listenRspLock.unlock();
Package msg = new Package();
msg.setHeader(header);

// TODO: if startTime is modified
Utils.writeAndFlush(msg, startTime, taskExecuteTime, context, this);
listenRspSend = true;

listenRspLock.unlock();
}
}

Expand Down
47 changes: 47 additions & 0 deletions eventmesh-storage/eventmesh-storage-mongodb/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

configurations {
implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
implementation.exclude group: 'log4j', module: 'log4j'
}

dependencies {
implementation project(":eventmesh-storage:eventmesh-storage-api")
implementation project(":eventmesh-common")

testImplementation project(":eventmesh-storage:eventmesh-storage-api")
testImplementation project(":eventmesh-common")

implementation 'org.mongodb:mongodb-driver:3.12.11'
testImplementation 'org.mongodb:mongodb-driver:3.12.11'

implementation 'io.cloudevents:cloudevents-json-jackson'
testImplementation 'io.cloudevents:cloudevents-json-jackson'

testImplementation "org.mockito:mockito-core"
testImplementation "org.powermock:powermock-module-junit4"
testImplementation "org.powermock:powermock-api-mockito2"

compileOnly 'org.projectlombok:lombok:1.18.22'
annotationProcessor 'org.projectlombok:lombok:1.18.22'

testCompileOnly 'org.projectlombok:lombok:1.18.22'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'

testImplementation 'de.bwaldvogel:mongo-java-server:1.42.0'
}
18 changes: 18 additions & 0 deletions eventmesh-storage/eventmesh-storage-mongodb/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

pluginType=storage
pluginName=mongodb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.mongodb.client;

import com.mongodb.ConnectionString;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;

public class MongodbClientManager {
/**
* create mongodb client
*
* @param url url, like: mongodb://root:[email protected]:27018,192.168.74.143:27019
* @return mongodb client
*/
public static MongoClient createMongodbClient(String url) {
ConnectionString connectionString = new ConnectionString(url);
return MongoClients.create(connectionString);
}

/**
* close mongodb client
*
* @param mongoClient mongodb client
*/
public static void closeMongodbClient(MongoClient mongoClient) {
mongoClient.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.mongodb.config;

public class ConfigKey {

public static final String STANDALONE = "STANDALONE";

public static final String REPLICA_SET = "REPLICA_SET";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.mongodb.config;

import org.apache.eventmesh.common.config.Config;
import org.apache.eventmesh.common.config.ConfigFiled;

import lombok.Data;

@Data
@Config(prefix = "eventMesh.server.mongodb", path = "classPath://mongodb-client.properties")
public class ConfigurationHolder {
@ConfigFiled(field = "connectorType")
private String connectorType;

@ConfigFiled(field = "url")
private String url;

@ConfigFiled(field = "database")
private String database;

@ConfigFiled(field = "collection")
private String collection;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.storage.mongodb.constant;

public class MongodbConstants {
public static final String TOPIC = "flag";
public static final String CAPPED_COL_TOPIC_FN = "topic";
public static final String CAPPED_COL_CURSOR_FN = "ts";
public static final String SEQUENCE_COLLECTION_NAME = "pub_sub_seq";
public static final String SEQUENCE_KEY_FN = "topic";
public static final String SEQUENCE_VALUE_FN = "value";

}
Loading

0 comments on commit 0af4f3e

Please sign in to comment.