Skip to content

Commit

Permalink
Convert Range to record in Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr authored and wendigo committed Apr 22, 2024
1 parent 34102eb commit 0e67f49
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,23 @@ public KafkaFilteringResult getKafkaFilterResult(
if (offsetRanged.isPresent()) {
Range range = offsetRanged.get();
partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets,
partition -> (range.getBegin() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.getBegin()) : Optional.empty());
partition -> (range.begin() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.begin()) : Optional.empty());
partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets,
partition -> (range.getEnd() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.getEnd()) : Optional.empty());
partition -> (range.end() != INVALID_KAFKA_RANGE_INDEX) ? Optional.of(range.end()) : Optional.empty());
}

// push down timestamp if possible
if (offsetTimestampRanged.isPresent()) {
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = consumerFactory.create(session)) {
// filter negative value to avoid java.lang.IllegalArgumentException when using KafkaConsumer offsetsForTimes
if (offsetTimestampRanged.get().getBegin() > INVALID_KAFKA_RANGE_INDEX) {
if (offsetTimestampRanged.get().begin() > INVALID_KAFKA_RANGE_INDEX) {
partitionBeginOffsets = overridePartitionBeginOffsets(partitionBeginOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getBegin()));
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().begin()));
}
if (isTimestampUpperBoundPushdownEnabled(session, kafkaTableHandle.topicName())) {
if (offsetTimestampRanged.get().getEnd() > INVALID_KAFKA_RANGE_INDEX) {
if (offsetTimestampRanged.get().end() > INVALID_KAFKA_RANGE_INDEX) {
partitionEndOffsets = overridePartitionEndOffsets(partitionEndOffsets,
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().getEnd()));
partition -> findOffsetsForTimestampGreaterOrEqual(kafkaConsumer, partition, offsetTimestampRanged.get().end()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private KafkaRecordCursor()
topicPartition = new TopicPartition(split.getTopicName(), split.getPartitionId());
kafkaConsumer = consumerFactory.create(connectorSession);
kafkaConsumer.assign(ImmutableList.of(topicPartition));
kafkaConsumer.seek(topicPartition, split.getMessagesRange().getBegin());
kafkaConsumer.seek(topicPartition, split.getMessagesRange().begin());
}

@Override
Expand Down Expand Up @@ -150,7 +150,7 @@ public boolean advanceNextPosition()
if (records.hasNext()) {
return nextRow(records.next());
}
if (kafkaConsumer.position(topicPartition) >= split.getMessagesRange().getEnd()) {
if (kafkaConsumer.position(topicPartition) >= split.getMessagesRange().end()) {
return false;
}
records = kafkaConsumer.poll(Duration.ofMillis(CONSUMER_POLL_TIMEOUT)).iterator();
Expand All @@ -161,7 +161,7 @@ private boolean nextRow(ConsumerRecord<byte[], byte[]> message)
{
requireNonNull(message, "message is null");

if (message.offset() >= split.getMessagesRange().getEnd()) {
if (message.offset() >= split.getMessagesRange().end()) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(messageDataFormat)
+ sizeOf(keyDataSchemaContents, SizeOf::estimatedSizeOf)
+ sizeOf(messageDataSchemaContents, SizeOf::estimatedSizeOf)
+ messagesRange.getRetainedSizeInBytes()
+ messagesRange.retainedSizeInBytes()
+ leader.getRetainedSizeInBytes();
}

Expand Down
42 changes: 6 additions & 36 deletions plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/Range.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,42 +13,21 @@
*/
package io.trino.plugin.kafka;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.instanceSize;
import static java.lang.Math.min;

public class Range
/**
* @param begin inclusive
* @param end exclusive
*/
public record Range(long begin, long end)
{
private static final int INSTANCE_SIZE = instanceSize(Range.class);

private final long begin; // inclusive
private final long end; // exclusive

@JsonCreator
public Range(@JsonProperty long begin, @JsonProperty long end)
{
this.begin = begin;
this.end = end;
}

@JsonProperty
public long getBegin()
{
return begin;
}

@JsonProperty
public long getEnd()
{
return end;
}

public List<Range> partition(int partitionSize)
{
ImmutableList.Builder<Range> partitions = ImmutableList.builder();
Expand All @@ -60,16 +39,7 @@ public List<Range> partition(int partitionSize)
return partitions.build();
}

@Override
public String toString()
{
return toStringHelper(this)
.add("begin", begin)
.add("end", end)
.toString();
}

public long getRetainedSizeInBytes()
public long retainedSizeInBytes()
{
return INSTANCE_SIZE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,21 @@ public void testFilterRangeByDomain()
{
Domain testDomain = Domain.singleValue(BIGINT, 1L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).isPresent()).isTrue();
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getBegin()).isEqualTo(1L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getEnd()).isEqualTo(2L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().begin()).isEqualTo(1L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().end()).isEqualTo(2L);

testDomain = multipleValues(BIGINT, ImmutableList.of(3L, 8L));
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).isPresent()).isTrue();
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getBegin()).isEqualTo(3L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getEnd()).isEqualTo(9L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().begin()).isEqualTo(3L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().end()).isEqualTo(9L);

testDomain = Domain.create(SortedRangeSet.copyOf(BIGINT,
ImmutableList.of(
Range.range(BIGINT, 2L, true, 4L, true))),
false);

assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).isPresent()).isTrue();
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getBegin()).isEqualTo(2L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().getEnd()).isEqualTo(5L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().begin()).isEqualTo(2L);
assertThat(KafkaFilterManager.filterRangeByDomain(testDomain).get().end()).isEqualTo(5L);
}
}

0 comments on commit 0e67f49

Please sign in to comment.