Skip to content

Commit

Permalink
Merge pull request #1413 from apache/kafka-connector
Browse files Browse the repository at this point in the history
[ISSUE #199] Kafka connector as event storage plugin
close #199
  • Loading branch information
xwm1992 authored Sep 26, 2022
2 parents b560c2a + e0cdb2c commit a121196
Show file tree
Hide file tree
Showing 33 changed files with 1,942 additions and 8 deletions.
47 changes: 47 additions & 0 deletions eventmesh-connector-plugin/eventmesh-connector-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
configurations {
implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
implementation.exclude group: 'log4j', module: 'log4j'
}

dependencies {
implementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
implementation project(":eventmesh-common")
// https://mavenlibs.com/maven/dependency/io.cloudevents/cloudevents-kafka
implementation group: 'io.cloudevents', name: 'cloudevents-kafka', version: '2.2.1'

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
implementation 'org.apache.kafka:kafka-clients:3.0.0'

testImplementation 'org.junit.jupiter:junit-jupiter-api:5.6.0'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'

testImplementation project(":eventmesh-connector-plugin:eventmesh-connector-api")
testImplementation project(":eventmesh-common")

testImplementation "org.mockito:mockito-core"
testImplementation "org.powermock:powermock-module-junit4"
testImplementation "org.powermock:powermock-api-mockito2"

compileOnly 'org.projectlombok:lombok:1.18.22'
compileOnly 'com.google.code.findbugs:jsr305:3.0.1'
annotationProcessor 'org.projectlombok:lombok:1.18.22'

testCompileOnly 'org.projectlombok:lombok:1.18.22'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.22'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

kafka_version=3.2.0

pluginType=connector
pluginName=kafka
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.common;

public class Constants {

public static final String BROADCAST_PREFIX = "broadcast-";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.common;

public class EventMeshConstants {

public static final String EVENTMESH_CONF_FILE = "kafka-client.properties";

public static final int DEFAULT_TIMEOUT_IN_MILLISECONDS = 3000;

public static final String STORE_TIMESTAMP = "storetime";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.config;

import org.apache.commons.lang3.StringUtils;

import com.google.common.base.Preconditions;

public class ClientConfiguration {

public String namesrvAddr = "";
public String clientUserName = "username";
public String clientPass = "password";
public Integer consumeThreadMin = 2;
public Integer consumeThreadMax = 2;
public Integer consumeQueueSize = 10000;
public Integer pullBatchSize = 32;
public Integer ackWindow = 1000;
public Integer pubWindow = 100;
public long consumeTimeout = 0L;
public Integer pollNameServerInterval = 10 * 1000;
public Integer heartbeatBrokerInterval = 30 * 1000;
public Integer rebalanceInterval = 20 * 1000;
public String clusterName = "";
public String accessKey = "";
public String secretKey = "";

public void init() {
String namesrvAddrStr = ConfigurationWrapper.getProp(ConfKeys.KEYS_EVENTMESH_KAFKA_SERVER_PORT);
Preconditions.checkState(StringUtils.isNotEmpty(namesrvAddrStr),
String.format("%s error", ConfKeys.KEYS_EVENTMESH_KAFKA_SERVER_PORT));
namesrvAddr = StringUtils.trim(namesrvAddrStr);
}

static class ConfKeys {

public static String KEYS_EVENTMESH_KAFKA_SERVER_PORT = "eventMesh.server.kafka.port";

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.config;

import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.kafka.common.EventMeshConstants;

import org.apache.commons.lang3.StringUtils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@UtilityClass
public class ConfigurationWrapper {

private static final Properties properties = new Properties();

static {
loadProperties();
}

public String getProp(String key) {
return StringUtils.isEmpty(key) ? null : properties.getProperty(key, null);
}

/**
* Load kafka properties file from classpath and conf home.
* The properties defined in conf home will override classpath.
*/
private void loadProperties() {
try (InputStream resourceAsStream = ConfigurationWrapper.class.getResourceAsStream(
"/" + EventMeshConstants.EVENTMESH_CONF_FILE)) {
if (resourceAsStream != null) {
properties.load(resourceAsStream);
}
} catch (IOException e) {
throw new RuntimeException(String.format("Load %s.properties file from classpath error", EventMeshConstants.EVENTMESH_CONF_FILE));
}
try {
String configPath = Constants.EVENTMESH_CONF_HOME + File.separator + EventMeshConstants.EVENTMESH_CONF_FILE;
if (new File(configPath).exists()) {
properties.load(new BufferedReader(new FileReader(configPath)));
}
} catch (IOException e) {
throw new IllegalArgumentException(String.format("Cannot load %s file from conf", EventMeshConstants.EVENTMESH_CONF_FILE));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.connector;

import org.apache.eventmesh.api.connector.ConnectorResourceService;

public class ConnectorResourceServiceKafkaImpl implements ConnectorResourceService {
@Override
public void init() throws Exception {

}

@Override
public void release() throws Exception {

}
}
Loading

0 comments on commit a121196

Please sign in to comment.