Skip to content

Commit

Permalink
MGDSTRM-9146 - Admin server does not return latest from all partitions (
Browse files Browse the repository at this point in the history
#222)

* MGDSTRM-9146 - Admin server does not return latest from all partitions

* Optimize end of poll detection and limit size of materialized record set

* Updating Systems

* Updating System Test

* Adding Exceptions

* Fix NoSuchElementException

Co-authored-by: Michael Edgar <[email protected]>
  • Loading branch information
biswassri and MikeEdgar authored Aug 24, 2022
1 parent c187078 commit 3adf9b3
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.bf2.admin.kafka.admin;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
Expand All @@ -26,17 +28,25 @@
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;


@RequestScoped
public class RecordOperations {

Expand All @@ -55,7 +65,7 @@ public Types.PagedResponse<Types.Record> consumeRecords(String topicName,
List<String> include,
Integer maxValueLength) {

try (Consumer<byte[], byte[]> consumer = clientFactory.createConsumer(limit)) {
try (Consumer<byte[], byte[]> consumer = clientFactory.createConsumer(null)) {
List<PartitionInfo> partitions = consumer.partitionsFor(topicName);

if (partitions.isEmpty()) {
Expand Down Expand Up @@ -109,30 +119,86 @@ public Types.PagedResponse<Types.Record> consumeRecords(String topicName,
}
});
}
Instant timeout = Instant.now().plusSeconds(2);
int maxRecords = assignments.size() * limit;
List<Types.Record> results = new ArrayList<>();
AtomicInteger recordsConsumed = new AtomicInteger(0);

var records = consumer.poll(Duration.ofSeconds(2));
Iterable<ConsumerRecords<byte[], byte[]>> poll = () -> new Iterator<>() {
boolean emptyPoll = false;

List<Types.Record> items = StreamSupport.stream(records.spliterator(), false)
.map(rec -> {
Types.Record item = new Types.Record(topicName);
@Override
public boolean hasNext() {
return !emptyPoll && recordsConsumed.get() < maxRecords && Instant.now().isBefore(timeout);
}

setProperty(Types.Record.PROP_PARTITION, include, rec::partition, item::setPartition);
setProperty(Types.Record.PROP_OFFSET, include, rec::offset, item::setOffset);
setProperty(Types.Record.PROP_TIMESTAMP, include, () -> timestampToString(rec.timestamp()), item::setTimestamp);
setProperty(Types.Record.PROP_TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType);
setProperty(Types.Record.PROP_KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength)));
setProperty(Types.Record.PROP_VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength)));
setProperty(Types.Record.PROP_HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders);
item.updateHref();
@Override
public ConsumerRecords<byte[], byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
var records = consumer.poll(Duration.ofMillis(100));
int pollSize = records.count();
emptyPoll = pollSize == 0;
recordsConsumed.addAndGet(pollSize);
if (log.isTraceEnabled()) {
log.tracef("next() consumed records: %d; total %s", pollSize, recordsConsumed.get());
}
return records;
}
};

return item;
})
.collect(Collectors.toList());
Comparator<ConsumerRecord<byte[], byte[]>> comparator = Comparator.comparingLong(ConsumerRecord::timestamp);
if (timestamp == null && offset == null) {
comparator = comparator.reversed();
}
comparator = comparator
.thenComparingInt(ConsumerRecord::partition)
.thenComparingLong(ConsumerRecord::offset);

NavigableSet<ConsumerRecord<byte[], byte[]>> limitSet = new TreeSet<>(comparator) {
private static final long serialVersionUID = 1L;
@Override
public boolean add(ConsumerRecord<byte[], byte[]> rec) {
boolean added = super.add(rec);
if (size() > limit) {
pollLast();
}
return added;
}
};

StreamSupport.stream(poll.spliterator(), false)
.flatMap(records -> StreamSupport.stream(records.spliterator(), false))
.collect(Collectors.toCollection(() -> limitSet))
.stream()
.map(rec -> getItems(rec, topicName, include, maxValueLength))
.forEach(results::add);

if (log.isDebugEnabled()) {
log.debugf("Total consumed records: %d", recordsConsumed.get());
}

return Types.PagedResponse.forItems(Types.Record.class, results);

return Types.PagedResponse.forItems(Types.Record.class, items);
}
}

public Types.Record getItems(ConsumerRecord<byte[], byte[]> rec, String topicName, List<String> include, Integer maxValueLength) {
Types.Record item = new Types.Record(topicName);

setProperty(Types.Record.PROP_PARTITION, include, rec::partition, item::setPartition);
setProperty(Types.Record.PROP_OFFSET, include, rec::offset, item::setOffset);
setProperty(Types.Record.PROP_TIMESTAMP, include, () -> timestampToString(rec.timestamp()), item::setTimestamp);
setProperty(Types.Record.PROP_TIMESTAMP_TYPE, include, rec.timestampType()::name, item::setTimestampType);
setProperty(Types.Record.PROP_KEY, include, rec::key, k -> item.setKey(bytesToString(k, maxValueLength)));
setProperty(Types.Record.PROP_VALUE, include, rec::value, v -> item.setValue(bytesToString(v, maxValueLength)));
setProperty(Types.Record.PROP_HEADERS, include, () -> headersToMap(rec.headers(), maxValueLength), item::setHeaders);
item.updateHref();

return item;
}

public CompletionStage<Types.Record> produceRecord(String topicName, Types.Record input) {
CompletableFuture<Types.Record> promise = new CompletableFuture<>();
Producer<String, String> producer = clientFactory.createProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -229,29 +230,29 @@ void testConsumeRecordsByStartingOffset(int startingOffset, int expectedResults)

@ParameterizedTest
@CsvSource({
"1",
"2",
"3",
"4",
"20"
"10",
"20",
"30",
"40",
"200"
})
void testConsumeLatestRecords(int limit) {
final String topicName = UUID.randomUUID().toString();
final int totalRecords = 10;
topicUtils.createTopics(List.of(topicName), 1, Status.CREATED); // single partition
final int totalRecords = 100;
topicUtils.createTopics(List.of(topicName), 10, Status.CREATED); // single partition
List<String> messageValues = new ArrayList<>();

for (int i = 0; i < totalRecords; i++) {
String value = "the-value-" + i;
messageValues.add(value);
recordUtils.produceRecord(topicName, null, null, "the-key-" + i, value);
}
Collections.reverse(messageValues);

int resultCount = Math.min(limit, totalRecords);

given()
.log().ifValidationFails()
.queryParam("partition", 0)
.queryParam("limit", limit)
.when()
.get(RecordUtils.RECORDS_PATH, topicName)
Expand All @@ -260,7 +261,7 @@ void testConsumeLatestRecords(int limit) {
.statusCode(Status.OK.getStatusCode())
.body("total", equalTo(resultCount))
.body("items", hasSize(resultCount))
.body("items.findAll { it }.value", contains(messageValues.subList(totalRecords - resultCount, totalRecords).toArray(String[]::new)));
.body("items.findAll { it }.value", contains(messageValues.subList(0, resultCount).toArray(String[]::new)));
}

@Test
Expand Down

0 comments on commit 3adf9b3

Please sign in to comment.