Skip to content

Commit

Permalink
remove autoCommit
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun committed Sep 11, 2021
1 parent d33fea2 commit af72021
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class KafkaConsumerConfig {
private Integer maxPartitionFetchBytes;
private Integer sessionTimeoutMs;
private String autoOffsetReset;
private Boolean enableAutoCommit = false;
private Integer maxPollRecords;

public KafkaConsumerConfig(ConfigurationWrapper configurationWrapper) {
Expand Down Expand Up @@ -65,9 +64,6 @@ public void init(ConfigurationWrapper configurationWrapper) {
if (configurationWrapper.getProperty(KafkaConsumerConfigKey.autoOffsetReset) != null) {
this.autoOffsetReset = configurationWrapper.getProperty(KafkaConsumerConfigKey.autoOffsetReset);
}
if (configurationWrapper.getProperty(KafkaConsumerConfigKey.enableAutoCommit) != null) {
this.enableAutoCommit = Boolean.valueOf(configurationWrapper.getProperty(KafkaConsumerConfigKey.enableAutoCommit));
}
if (configurationWrapper.getProperty(KafkaConsumerConfigKey.maxPollRecords) != null) {
this.maxPollRecords = Integer.valueOf(configurationWrapper.getProperty(KafkaConsumerConfigKey.maxPollRecords));
}
Expand Down Expand Up @@ -146,14 +142,6 @@ public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}

public Boolean getEnableAutoCommit() {
return enableAutoCommit;
}

public void setEnableAutoCommit(Boolean enableAutoCommit) {
this.enableAutoCommit = enableAutoCommit;
}

public Integer getMaxPollRecords() {
return maxPollRecords;
}
Expand All @@ -172,7 +160,6 @@ public static class KafkaConsumerConfigKey {
public static String maxPartitionFetchBytes = "max.partition.fetch.bytes";
public static String sessionTimeoutMs = "session.timeout.ms";
public static String autoOffsetReset = "auto.offset.reset";
public static String enableAutoCommit = "enable.auto.commit";
public static String maxPollRecords = "max.poll.records";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.eventmesh.connector.kafka.consumer;

import io.openmessaging.api.Action;
import io.openmessaging.api.AsyncConsumeContext;
import io.openmessaging.api.AsyncGenericMessageListener;
import io.openmessaging.api.AsyncMessageListener;
import io.openmessaging.api.Consumer;
Expand All @@ -28,6 +26,8 @@
import io.openmessaging.api.MessageSelector;
import io.openmessaging.api.exception.OMSRuntimeException;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -88,9 +88,6 @@ public void init(KafkaConsumerConfig kafkaConsumerConfig) {
if (kafkaConsumerConfig.getAutoOffsetReset() != null) {
properties.put(KafkaConsumerConfig.KafkaConsumerConfigKey.autoOffsetReset, kafkaConsumerConfig.getAutoOffsetReset());
}
if (kafkaConsumerConfig.getEnableAutoCommit() != null) {
properties.put(KafkaConsumerConfig.KafkaConsumerConfigKey.enableAutoCommit, kafkaConsumerConfig.getEnableAutoCommit());
}
if (kafkaConsumerConfig.getMaxPollRecords() != null) {
properties.put(KafkaConsumerConfig.KafkaConsumerConfigKey.maxPollRecords, kafkaConsumerConfig.getMaxPollRecords());
}
Expand Down Expand Up @@ -169,11 +166,14 @@ public void start() {
String.format("The topic/queue %s isn't attached to this consumer", record.topic()));
}
AtomicBoolean commitSuccess = new AtomicBoolean(false);
AsyncConsumeContext omsContext = new AsyncConsumeContext() {
EventMeshAsyncConsumeContext consumeContext = new EventMeshAsyncConsumeContext() {
@Override
public void commit(Action action) {
if (Action.CommitMessage.equals(action)) {
commitSuccess.set(true);
public void commit(EventMeshAction action) {
switch (action) {
case ManualAck:
commitSuccess.set(true);
case CommitMessage:
case ReconsumeLater:
}
}
};
Expand All @@ -182,8 +182,8 @@ public void commit(Action action) {
Message message = record.value();
message.putUserProperties("offset", String.valueOf(offset));
message.putUserProperties("partition", String.valueOf(partition));
listener.consume(message, omsContext);
if (commitSuccess.get() && kafkaConsumerConfig.getEnableAutoCommit()) {
listener.consume(message, consumeContext);
if (commitSuccess.get()) {
// consumer success
kafkaConsumer.commitSync();
}
Expand Down

0 comments on commit af72021

Please sign in to comment.