Skip to content

Commit

Permalink
Update Kafka client to 0.10.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Jan 26, 2019
1 parent a881632 commit b0f1795
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 21 deletions.
16 changes: 0 additions & 16 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -885,22 +885,6 @@
<version>9.6.3-3</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>

<dep.kafka.version>0.10.2.2</dep.kafka.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -60,6 +62,17 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${dep.kafka.version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.prestosql.spi.connector.ConnectorTransactionHandle;
import io.prestosql.spi.connector.FixedSplitSource;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.cluster.BrokerEndPoint;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
Expand Down Expand Up @@ -100,7 +100,7 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
for (PartitionMetadata part : metadata.partitionsMetadata()) {
log.debug("Adding Partition %s/%s", metadata.topic(), part.partitionId());

Broker leader = part.leader();
BrokerEndPoint leader = part.leader();
if (leader == null) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Leader election in progress for Kafka topic '%s' partition %s", metadata.topic(), part.partitionId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -86,7 +89,7 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
.putAll(Maps.fromProperties(overrideProperties))
.build();

KafkaConfig config = new KafkaConfig(toProperties(properties));
KafkaConfig config = new KafkaConfig(properties);
this.kafka = new KafkaServerStartable(config);
}

Expand Down Expand Up @@ -120,10 +123,17 @@ public void createTopics(int partitions, int replication, Properties topicProper
{
checkState(started.get() && !stopped.get(), "not started!");

ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30_000, 30_000, ZKStringSerializer$.MODULE$);
ZkConnection zkConnection = new ZkConnection(getZookeeperConnectString(), 30_000);
ZkClient zkClient = new ZkClient(zkConnection, 30_000, ZKStringSerializer$.MODULE$);
try {
for (String topic : topics) {
AdminUtils.createTopic(zkClient, topic, partitions, replication, topicProperties);
AdminUtils.createTopic(
new ZkUtils(zkClient, zkConnection, false),
topic,
partitions,
replication,
topicProperties,
RackAwareMode.Disabled$.MODULE$);
}
}
finally {
Expand Down

0 comments on commit b0f1795

Please sign in to comment.