Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pull in Kafka 2.0 clients #100

Merged
merged 4 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ project.ext {
url "https://github.com/linkedin/li-apache-kafka-clients"
}
}
kafkaVersion = "0.11.0.3"
kafkaVersion = "2.0.1"
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.testng.Assert;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;


public abstract class AbstractKafkaIntegrationTestHarness extends AbstractZookeeperTestHarness {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;


public class EmbeddedBrokerBuilder {
Expand Down Expand Up @@ -67,7 +67,7 @@ public EmbeddedBrokerBuilder logRetention(long millis) {
this.logRetentionMs = millis;
return this;
}

public EmbeddedBrokerBuilder enable(SecurityProtocol protocol) {
switch (protocol) {
case PLAINTEXT:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.linkedin.kafka.clients.largemessage.MessageAssemblerImpl;
import com.linkedin.kafka.clients.largemessage.errors.ConsumerRecordsProcessingException;
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -221,6 +222,11 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
_kafkaConsumer.subscribe(pattern, _consumerRebalanceListener);
}

@Override
public void subscribe(Pattern pattern) {
_kafkaConsumer.subscribe(pattern);
}

@Override
public void unsubscribe() {
// Clear all the state of the topic in consumer record processor.
Expand Down Expand Up @@ -268,45 +274,67 @@ public ConsumerRecords<K, V> poll(long timeout) {
return processedRecords;
}

@Override
public ConsumerRecords<K, V> poll(Duration timeout) {
return poll(timeout.toMillis());
}

@Override
public void commitSync() {
// Preserve the high watermark.
commitOffsets(currentOffsetAndMetadataMap(), false, null, true);
commitOffsets(currentOffsetAndMetadataMap(), false, null, true, null);
}

@Override
public void commitSync(Duration timeout) {
commitOffsets(currentOffsetAndMetadataMap(), false, null, true, timeout);
}

@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
// ignore the high watermark.
commitOffsets(offsets, true, null, true);
commitOffsets(offsets, true, null, true, null);
}

@Override
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
commitOffsets(offsets, true, null, true, timeout);
}

@Override
public void commitAsync() {
commitOffsets(currentOffsetAndMetadataMap(), false, null, false);
commitOffsets(currentOffsetAndMetadataMap(), false, null, false, null);
}

@Override
public void commitAsync(OffsetCommitCallback callback) {
// preserve the high watermark.
commitOffsets(currentOffsetAndMetadataMap(), false, callback, false);
commitOffsets(currentOffsetAndMetadataMap(), false, callback, false, null);
}

@Override
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
// Ignore the high watermark.
commitOffsets(offsets, true, callback, false);
commitOffsets(offsets, true, callback, false, null);
}

// Private function to avoid duplicate code.
// timeout is used when sync == true only.
private void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,
boolean ignoreConsumerHighWatermark,
OffsetCommitCallback callback,
boolean sync) {
boolean sync,
Duration timeout) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
_consumerRecordsProcessor.safeOffsetsToCommit(offsets, ignoreConsumerHighWatermark);
if (sync) {
LOG.trace("Committing offsets synchronously: {}", offsetsToCommit);
_kafkaConsumer.commitSync(offsetsToCommit);
if (timeout == null) {
LOG.trace("Committing offsets synchronously: {}", offsetsToCommit);
_kafkaConsumer.commitSync(offsetsToCommit);
} else {
LOG.trace("Committing offsets synchronously with timeout {} ms: {}", timeout.toMillis(), offsetsToCommit);
_kafkaConsumer.commitSync(offsetsToCommit, timeout);
}
} else {
LOG.trace("Committing offsets asynchronously: {}", offsetsToCommit);
_offsetCommitCallback.setUserCallback(callback);
Expand Down Expand Up @@ -409,10 +437,24 @@ public void seekToCommitted(Collection<TopicPartition> partitions) {

@Override
public long position(TopicPartition partition) {
return positionMain(partition, null);
}

@Override
public long position(TopicPartition partition, Duration timeout) {
return positionMain(partition, timeout);
}

// A help method for position() to avoid code duplication
private long positionMain(TopicPartition partition, Duration timeout) {
// Not handling large message here. The position will be actual position.
while (true) { // In kafka 0.10.x we can get an unbounded number of invalid offset exception
jonlee2 marked this conversation as resolved.
Show resolved Hide resolved
try {
return _kafkaConsumer.position(partition);
if (timeout == null) {
return _kafkaConsumer.position(partition);
} else {
return _kafkaConsumer.position(partition, timeout);
}
} catch (OffsetOutOfRangeException | NoOffsetForPartitionException oe) {
handleInvalidOffsetException(oe);
}
Expand Down Expand Up @@ -528,10 +570,25 @@ private void seekAndClear(TopicPartition tp, Long offset) {

@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committedMain(partition, null);
}

@Override
public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
return committedMain(partition, timeout);
}

// A help method for committed() to avoid code duplication.
private OffsetAndMetadata committedMain(TopicPartition partition, Duration timeout) {
// Not handling large message here. The committed will be the actual committed value.
// The returned metadata includes the user committed offset and the user committed metadata, separated by the
// first comma.
OffsetAndMetadata offsetAndMetadata = _kafkaConsumer.committed(partition);
OffsetAndMetadata offsetAndMetadata;
if (timeout == null) {
offsetAndMetadata = _kafkaConsumer.committed(partition);
} else {
offsetAndMetadata = _kafkaConsumer.committed(partition, timeout);
}
if (offsetAndMetadata != null) {
String rawMetadata = offsetAndMetadata.metadata();
Long userOffset = LiKafkaClientsUtils.offsetFromWrappedMetadata(rawMetadata);
Expand Down Expand Up @@ -566,11 +623,21 @@ public List<PartitionInfo> partitionsFor(String topic) {
return _kafkaConsumer.partitionsFor(topic);
}

@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
return _kafkaConsumer.partitionsFor(topic, timeout);
}

@Override
public Map<String, List<PartitionInfo>> listTopics() {
return _kafkaConsumer.listTopics();
}

@Override
public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
return _kafkaConsumer.listTopics(timeout);
}

@Override
public Set<TopicPartition> paused() {
return _kafkaConsumer.paused();
Expand All @@ -591,16 +658,31 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio
return _kafkaConsumer.offsetsForTimes(timestampsToSearch);
}

@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
return _kafkaConsumer.offsetsForTimes(timestampsToSearch, timeout);
}

@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
return _kafkaConsumer.beginningOffsets(partitions);
}

@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
return _kafkaConsumer.beginningOffsets(partitions, timeout);
}

@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
return _kafkaConsumer.endOffsets(partitions);
}

@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
return _kafkaConsumer.endOffsets(partitions, timeout);
}

@Override
public Long safeOffset(TopicPartition tp, long messageOffset) {
return _consumerRecordsProcessor.safeOffset(tp, messageOffset);
Expand Down Expand Up @@ -648,6 +730,11 @@ public void close(long timeout, TimeUnit timeUnit) {
LOG.info("Shutdown complete in {} millis", (System.currentTimeMillis() - start));
}

@Override
public void close(Duration timeout) {
close(timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void wakeup() {
_kafkaConsumer.wakeup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
});

for (int i = 0; i < numSegments - 1; i++) {
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), null);
assertTrue("The user callback should not be fired.", callbackFired.get() == 0);
}
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), null);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), null);
assertTrue("The user callback should not be fired.", callbackFired.get() == 1);
}

Expand All @@ -59,10 +59,10 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (i == 3) {
e = e1;
}
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), e);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), e);
assertTrue("The user callback should not be fired.", callbackFired.get() == 0);
}
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0, 0, 0), e2);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), e2);
assertTrue("The user callback should not be fired.", callbackFired.get() == 1);
}
}