Skip to content

Commit

Permalink
fix: improve print topic (#5552)
Browse files Browse the repository at this point in the history
PRINT topic tries to determine the format of a topic's key and value.  Sometimes it can mis-detect the key. For example, if the key is a string, and the first value is `Die Hard` (as bytes) then it would interpret this as a `BIGINT` as its eight bytes.  Once it seems more rows it will likely work out the key is a `STRING`, (assuming at least one key is not 8bytes long.

With this change PRINT will process the whole batch of messages retrieved from the broker to determine the formats and only then output them. This makes it much more likely it will output the first few rows with the right formats, at the cost of formatting all the rows in the batch, even if the user only wanted to see one or two.

Co-authored-by: Andy Coates <[email protected]>
  • Loading branch information
big-andy-coates and big-andy-coates authored Jun 5, 2020
1 parent 3321d3f commit e193576
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -112,8 +110,8 @@ public Collection<String> poll() {
return null;
}

final Collection<Supplier<String>> formatted = formatter.format(records);
final Collection<Supplier<String>> limited = new LimitIntervalCollection<>(
final Collection<String> formatted = formatter.format(records);
final Collection<String> limited = new LimitIntervalCollection<>(
formatted,
printTopic.getLimit().orElse(Integer.MAX_VALUE) - numWritten,
printTopic.getIntervalValue(),
Expand All @@ -128,9 +126,7 @@ public Collection<String> poll() {
setDone();
}

return limited.stream()
.map(Supplier::get)
.collect(Collectors.toList());
return limited;
} catch (final Exception e) {
setError(e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,25 @@ public RecordFormatter(
this.valueDeserializers = requireNonNull(valueDeserializers, "valueDeserializers");
}

public List<Supplier<String>> format(final Iterable<ConsumerRecord<Bytes, Bytes>> records) {
return StreamSupport.stream(records.spliterator(), false)
.map(this::delayedFormat)
.collect(Collectors.toList());
public List<String> format(final Iterable<ConsumerRecord<Bytes, Bytes>> records) {
final String activeKeyFormat = keyDeserializers.getPossibleFormats().get(0);
final String activeValueFormat = valueDeserializers.getPossibleFormats().get(0);

final List<String> formatted = formatRecords(records);

final boolean sameKeyFormatChanged = activeKeyFormat
.equals(keyDeserializers.getPossibleFormats().get(0));

final boolean sameValueFormatChanged = activeValueFormat
.equals(valueDeserializers.getPossibleFormats().get(0));

if (sameKeyFormatChanged && sameValueFormatChanged) {
return formatted;
}

// The active key/value format has been eliminated as a possibility while processing this batch.
// Reformat with the new active format:
return formatRecords(records);
}

/**
Expand All @@ -133,8 +148,14 @@ public List<String> getPossibleValueFormats() {
return valueDeserializers.getPossibleFormats();
}

private Supplier<String> delayedFormat(final ConsumerRecord<Bytes, Bytes> record) {
return () -> "rowtime: " + formatRowTime(record.timestamp())
private List<String> formatRecords(final Iterable<ConsumerRecord<Bytes, Bytes>> records) {
return StreamSupport.stream(records.spliterator(), false)
.map(this::formatRecord)
.collect(Collectors.toList());
}

private String formatRecord(final ConsumerRecord<Bytes, Bytes> record) {
return "rowtime: " + formatRowTime(record.timestamp())
+ ", " + "key: " + keyDeserializers.format(record.key())
+ ", value: " + valueDeserializers.format(record.value());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.Bytes;
Expand Down Expand Up @@ -113,16 +112,16 @@ public void write(final OutputStream out) {
continue;
}

final List<Supplier<String>> values = formatter.format(records.records(topicName));
final List<String> values = formatter.format(records.records(topicName));
if (values.isEmpty()) {
continue;
}

final List<String> toOutput = new ArrayList<>();
for (final Supplier<String> value : values) {
for (final String value : values) {
if (messagesPolled++ % interval == 0) {
messagesWritten++;
toOutput.add(value.get());
toOutput.add(value);
}

if (limitReached.test(messagesWritten)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -53,7 +51,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
Expand Down Expand Up @@ -109,6 +106,9 @@ public static class MainClassTest {

@Before
public void setUp() {
when(keyDeserializers.getPossibleFormats()).thenReturn(ImmutableList.of("key-format"));
when(valueDeserializers.getPossibleFormats()).thenReturn(ImmutableList.of("value-format"));

formatter = new RecordFormatter(
keyDeserializers,
valueDeserializers
Expand Down Expand Up @@ -166,24 +166,10 @@ public void shouldStayWithUnknownKeyFormatIfProcessingNullValues() {
}

@Test
public void shouldDelayFormatting() {
public void shouldFormat() {
// When:
formatter.format(consumerRecords(KEY_BYTES, VALUE_BYTES));

// Then:
verify(keyDeserializers, never()).format(any());
verify(valueDeserializers, never()).format(any());
}

@Test
public void shouldFormatOnDemand() {
// Given:
final List<Supplier<String>> result = formatter
.format(consumerRecords(KEY_BYTES, VALUE_BYTES));

// When:
result.get(0).get();

// Then:
verify(keyDeserializers).format(KEY_BYTES);
verify(valueDeserializers).format(VALUE_BYTES);
Expand Down Expand Up @@ -254,9 +240,9 @@ public void shouldReturnPossibleValueFormats() {
}

private String formatSingle(final ConsumerRecord<Bytes, Bytes> consumerRecord) {
final List<Supplier<String>> result = formatter.format(ImmutableList.of(consumerRecord));
final List<String> result = formatter.format(ImmutableList.of(consumerRecord));
assertThat(result, hasSize(1));
return result.get(0).get();
return result.get(0);
}

@SuppressWarnings("SameParameterValue")
Expand Down Expand Up @@ -1260,6 +1246,79 @@ private static Serializer<Object> jsonSrSerializer() {
}
}

@RunWith(MockitoJUnitRunner.class)
public static class CombinedTest {

@Mock
private SchemaRegistryClient schemaRegistryClient;

private RecordFormatter formatter;

@Before
public void setUp() {
formatter = new RecordFormatter(schemaRegistryClient, "some-topic");
}

@Test
public void shouldReprocessBatchIfLikelyKeyFormatChanges() {
// Given:
final Iterable<ConsumerRecord<Bytes, Bytes>> records = consumerRecords(
// Key that is same size as BIGINT / DOUBLE:
consumerRecord(Bytes.wrap("Die Hard".getBytes(UTF_8)), null),
consumerRecord(Bytes.wrap("Key that's clearly a string".getBytes(UTF_8)), null)
);

// When:
final List<String> formatted = formatter.format(records);

// Then:
assertThat(formatted.get(0), containsString("Die Hard"));
}

@Test
public void shouldReprocessBatchIfLikelyValueFormatChanges() {
// Given:
final Iterable<ConsumerRecord<Bytes, Bytes>> records = consumerRecords(
// Value that is same size as BIGINT / DOUBLE:
consumerRecord(null, Bytes.wrap("Die Hard".getBytes(UTF_8))),
consumerRecord(null, Bytes.wrap("Key that's clearly a string".getBytes(UTF_8)))
);

// When:
final List<String> formatted = formatter.format(records);

// Then:
assertThat(formatted.get(0), containsString("Die Hard"));
}


@SuppressWarnings("varargs")
@SafeVarargs
private static Iterable<ConsumerRecord<Bytes, Bytes>> consumerRecords(
final ConsumerRecord<Bytes, Bytes>... records
) {
return ImmutableList.copyOf(records);
}

private static ConsumerRecord<Bytes, Bytes> consumerRecord(
final Bytes keyBytes,
final Bytes valueBytes
) {
return new ConsumerRecord<>(
TOPIC_NAME,
1,
1,
1234L,
TimestampType.CREATE_TIME,
123,
1,
1,
keyBytes,
valueBytes
);
}
}

@SafeVarargs
@SuppressWarnings("varargs")
private static <T> Matcher<Collection<T>> notHasItems(final T... items) {
Expand Down

0 comments on commit e193576

Please sign in to comment.