Skip to content

Commit

Permalink
Merge pull request #1 from MikeEdgar/latest-records-changes
Browse files Browse the repository at this point in the history
Optimize end of poll detection and limit size of materialized record set
  • Loading branch information
biswassri authored Aug 19, 2022
2 parents 32a5b24 + 3a44da6 commit 12ed496
Showing 1 changed file with 42 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -61,7 +64,7 @@ public Types.PagedResponse<Types.Record> consumeRecords(String topicName,
List<String> include,
Integer maxValueLength) {

try (Consumer<byte[], byte[]> consumer = clientFactory.createConsumer(limit)) {
try (Consumer<byte[], byte[]> consumer = clientFactory.createConsumer(null)) {
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);

if (partitions.isEmpty()) {
Expand Down Expand Up @@ -116,47 +119,76 @@ public Types.PagedResponse<Types.Record> consumeRecords(String topicName,
});
}
Instant timeout = Instant.now().plusSeconds(2);
int maxRecords = assignments.size() * limit;
List<Types.Record> results = new ArrayList<>();
AtomicInteger recordsConsumed = new AtomicInteger(0);

Iterable<ConsumerRecords<byte[], byte[]>> poll = () -> new Iterator<>() {
boolean emptyPoll = false;

@Override
public boolean hasNext() {
return results.size() < assignments.size() * limit && Instant.now().compareTo(timeout) < 0;
return !emptyPoll && recordsConsumed.get() < maxRecords && Instant.now().isBefore(timeout);
}

@Override
public ConsumerRecords<byte[], byte[]> next() {
return consumer.poll(Duration.ofMillis(100));
var records = consumer.poll(Duration.ofMillis(100));
int pollSize = records.count();
emptyPoll = (pollSize == 0);
recordsConsumed.addAndGet(pollSize);
if (log.isTraceEnabled()) {
log.tracef("next() consumed records: %d; total %s", pollSize, recordsConsumed.get());
}
return records;
}
};

Comparator<ConsumerRecord<byte[], byte[]>> comparator = Comparator.comparing(ConsumerRecord::timestamp);
Comparator<ConsumerRecord<byte[], byte[]>> comparator = Comparator.comparingLong(ConsumerRecord::timestamp);
if (timestamp == null && offset == null) {
comparator = comparator.reversed();
}
comparator = comparator.thenComparing(ConsumerRecord::partition);
comparator = comparator
.thenComparingInt(ConsumerRecord::partition)
.thenComparingLong(ConsumerRecord::offset);

NavigableSet<ConsumerRecord<byte[], byte[]>> limitSet = new TreeSet<>(comparator) {
private static final long serialVersionUID = 1L;
@Override
public boolean add(ConsumerRecord<byte[], byte[]> rec) {
boolean added = super.add(rec);
if (size() > limit) {
pollLast();
}
return added;
}
};

StreamSupport.stream(poll.spliterator(), false)
.flatMap(records -> StreamSupport.stream(records.spliterator(), false))
.sorted(comparator)
.limit(limit)
.collect(Collectors.toCollection(() -> limitSet))
.stream()
.map(rec -> getItems(rec, topicName, include, maxValueLength))
.forEach(results::add);

if (log.isDebugEnabled()) {
log.debugf("Total consumed records: %d", recordsConsumed.get());
}

return Types.PagedResponse.forItems(Types.Record.class, results);

}
}

public Types.Record getItems(ConsumerRecord rec, String topicName, List<String> include, Integer maxValueLength) {
public Types.Record getItems(ConsumerRecord<byte[], byte[]> rec, String topicName, List<String> include, Integer maxValueLength) {
Types.Record item = new Types.Record(topicName);

setProperty(Types.Record.PROP_PARTITION, include, rec::partition, item::setPartition);
setProperty(Types.Record.PROP_OFFSET, include, rec::offset, item::setOffset);
setProperty(Types.Record.PROP_TIMESTAMP, include, () -> timestampToString(rec.timestamp()), item::setTimestamp);
setProperty(Types.Record.PROP_TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType);
setProperty(Types.Record.PROP_KEY, include, rec::key, k -> item.setKey(bytesToString((byte[]) k, maxValueLength)));
setProperty(Types.Record.PROP_VALUE, include, rec::value, v -> item.setValue(bytesToString((byte[]) v, maxValueLength)));
setProperty(Types.Record.PROP_KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength)));
setProperty(Types.Record.PROP_VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength)));
setProperty(Types.Record.PROP_HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders);
item.updateHref();

Expand Down

0 comments on commit 12ed496

Please sign in to comment.