Skip to content

Commit

Permalink
Pull in Kafka 2.0 clients (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonlee2 authored Dec 7, 2018
1 parent c0712ce commit 1967b6b
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 451 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ project.ext {
url "https://github.com/linkedin/li-apache-kafka-clients"
}
}
kafkaVersion = "0.11.0.3"
kafkaVersion = "2.0.1"
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils;
import com.linkedin.kafka.clients.utils.tests.AbstractKafkaClientsIntegrationTestHarness;
import com.linkedin.kafka.clients.utils.tests.KafkaTestUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -438,6 +440,34 @@ public void testCommitWithOffsetMap() {
}
}

@Test(expectedExceptions = TimeoutException.class)
public void testCommitWithTimeout() {
String topic = "testCommitWithTimeout";
produceSyntheticMessages(topic);
Properties props = new Properties();
// All the consumers should have the same group id.
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testCommitWithTimeout");
// Make sure we start to consume from the beginning.
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Only fetch one record at a time.
props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");
// No auto commmit
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
LiKafkaConsumer<String, String> consumer = createConsumer(props);
try {
TopicPartition tp = new TopicPartition(topic, SYNTHETIC_PARTITION_0);
consumer.assign(Arrays.asList(tp));

while (consumer.poll(10).isEmpty()) {
}
// Shutdown the broker so that offset commit would hang and eventually time out.
tearDown();
consumer.commitSync(Duration.ofSeconds(3));
} finally {
consumer.close();
}
}

@Test
public void testSeekToBeginningAndEnd() {
String topic = "testSeekToBeginningAndEnd";
Expand Down Expand Up @@ -916,6 +946,8 @@ public void testSearchOffsetByTimestamp() {
consumer.assign(timestampsToSearch.keySet());
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampsToSearch);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : offsets.entrySet()) {
assertNotNull(entry.getValue(), "Failed to find offset for topic partition " + entry.getKey() +
" for timestamp " + timestampsToSearch.get(entry.getKey()));
consumer.seek(entry.getKey(), entry.getValue().offset());
}

Expand Down Expand Up @@ -1289,7 +1321,8 @@ public void testFallOffStartWithLiClosest() throws InterruptedException {
produceRecordsWithKafkaProducer();
}
if (currentLso == initialLso) {
throw new IllegalStateException("nothing was truncated broker-side within timeout");
throw new IllegalStateException("nothing was truncated broker-side within timeout. LogStartOffset = " +
currentLso + " remains the same after " + giveUp + "ms.");
}
truncatedStartOffset = currentLso;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.io.File;
import java.io.IOException;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.testng.Assert;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;


public abstract class AbstractKafkaIntegrationTestHarness extends AbstractZookeeperTestHarness {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.security.auth.SecurityProtocol;


public class EmbeddedBrokerBuilder {
Expand Down Expand Up @@ -67,7 +67,7 @@ public EmbeddedBrokerBuilder logRetention(long millis) {
this.logRetentionMs = millis;
return this;
}

public EmbeddedBrokerBuilder enable(SecurityProtocol protocol) {
switch (protocol) {
case PLAINTEXT:
Expand Down
Loading

0 comments on commit 1967b6b

Please sign in to comment.