Skip to content

Commit

Permalink
[ISSUE apache#199] Support Kafka connector plugin and Kafka as event …
Browse files Browse the repository at this point in the history
…store apache#199
  • Loading branch information
ruanwenjun committed Aug 1, 2021
1 parent 0e4def2 commit ed046f3
Show file tree
Hide file tree
Showing 26 changed files with 1,966 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ subprojects {
exclude '**/spring-boot-devtools*.jar'
exclude '**/mumble-sdk-test*.jar'
exclude '*connector-rocketmq*.jar'
exclude '*connector-kafka*.jar'
exclude 'eventmesh-runtime*.jar'
exclude 'commons-collections-3.2.2.jar'
}
Expand Down
40 changes: 40 additions & 0 deletions eventmesh-connector-plugin/eventmesh-connector-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

configurations {
implementation.exclude group: 'ch.qos.logback', module: 'logback-classic'
}

List kafka = [
"org.apache.kafka:kafka-clients:$kafka_version",
]

List metrics = [
"io.dropwizard.metrics:metrics-core:4.1.0",
"io.dropwizard.metrics:metrics-healthchecks:4.1.0",
"io.dropwizard.metrics:metrics-annotation:4.1.0",
"io.dropwizard.metrics:metrics-json:4.1.0"
]

List open_message = [
"io.openmessaging:openmessaging-api:2.2.1-pubsub"
]

dependencies {
implementation kafka, metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
testImplementation kafka, metrics, open_message, project(":eventmesh-connector-plugin:eventmesh-connector-api")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

group=org.apache.eventmesh
version=1.2.0-SNAPSHOT
kafka_version=2.8.0
mavenUserName=
mavenPassword=
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka;

import io.openmessaging.api.Consumer;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.Producer;
import io.openmessaging.api.PullConsumer;
import io.openmessaging.api.batch.BatchConsumer;
import io.openmessaging.api.order.OrderConsumer;
import io.openmessaging.api.order.OrderProducer;
import io.openmessaging.api.transaction.LocalTransactionChecker;
import io.openmessaging.api.transaction.TransactionProducer;
import org.apache.eventmesh.connector.kafka.consumer.KafkaMQConsumerImpl;
import org.apache.eventmesh.connector.kafka.producer.KafkaMQProducerImpl;

import java.util.Properties;

/**
* The implementation of open message{@link MessagingAccessPoint}
*/
public class MessagingAccessPointImpl implements MessagingAccessPoint {


private final Properties attributesProperties;

public MessagingAccessPointImpl(final Properties attributesProperties) {
this.attributesProperties = attributesProperties;
}


@Override
public String version() {
return OMS.specVersion;
}

@Override
public Properties attributes() {
return attributesProperties;
}

@Override
public Producer createProducer(Properties properties) {
return new KafkaMQProducerImpl(properties);
}

@Override
public OrderProducer createOrderProducer(Properties properties) {
return null;
}

@Override
public TransactionProducer createTransactionProducer(Properties properties, LocalTransactionChecker checker) {
return null;
}

@Override
public TransactionProducer createTransactionProducer(Properties properties) {
return null;
}

@Override
public Consumer createConsumer(Properties properties) {
return new KafkaMQConsumerImpl(properties);
}

@Override
public PullConsumer createPullConsumer(Properties properties) {
return null;
}

@Override
public BatchConsumer createBatchConsumer(Properties properties) {
return null;
}

@Override
public OrderConsumer createOrderedConsumer(Properties properties) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.common;

public enum Constants {
;
public static final String EVENTMESH_CONF_HOME = System.getProperty("confPath", System.getenv("confPath"));

public static final String KAFKA_CONF_FILE = "kafka-client.properties";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.common;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.openmessaging.api.Message;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class OpenMessageDeserializer implements Deserializer<Message> {

private static final ObjectMapper objectMapper = new ObjectMapper();
static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

@Override
public Message deserialize(String topic, byte[] data) {
try {
if (data == null) {
return null;
}
return objectMapper.readValue(new String(data, StandardCharsets.UTF_8), Message.class);
} catch (IOException e) {
throw new SerializationException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.common;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.openmessaging.api.Message;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.StandardCharsets;

/**
* serialize {@link Message}
*/
public class OpenMessageSerializer implements Serializer<Message> {

private static final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte[] serialize(String topic, Message message) {
try {
if (message == null) {
return null;
}
return objectMapper.writeValueAsString(message).getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.kafka.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;

public class ConfigurationWrapper {

private Logger logger = LoggerFactory.getLogger(ConfigurationWrapper.class);

private Properties configProperties = new Properties();

private String configFilePath;

private boolean reload;

public ConfigurationWrapper(String configFilePath, boolean reload) {
this.configFilePath = configFilePath;
this.reload = reload;
init();
}

public String getProperty(String key) {
return configProperties.getProperty(key);
}

private void init() {
load();
if (reload) {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
load();
}
}, 30 * 1000, 30 * 1000);
}
}

private synchronized void load() {
try {
logger.info("loading config: {}", configFilePath);
Path path = Paths.get(configFilePath);
configProperties.load(Files.newBufferedReader(path));
} catch (Exception ex) {
logger.error("loading properties [{}] error", configFilePath, ex);
}
}

}
Loading

0 comments on commit ed046f3

Please sign in to comment.