Skip to content

Commit

Permalink
Kafka tombstone message support
Browse files Browse the repository at this point in the history
  • Loading branch information
vlad-lyutenko committed Apr 12, 2023
1 parent 20841a6 commit 9ae48c4
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ private boolean nextRow(ConsumerRecord<byte[], byte[]> message)
long timeStamp = message.timestamp() * MICROSECONDS_PER_MILLISECOND;

Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedKey = keyDecoder.decodeRow(keyData);
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue = messageDecoder.decodeRow(messageData);
// tombstone message has null value body
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue = message.value() == null ? Optional.empty() : messageDecoder.decodeRow(messageData);

Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = columnHandles.stream()
.filter(KafkaColumnHandle::isInternal)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY;
Expand All @@ -62,13 +63,13 @@ public class TestKafkaWithConfluentSchemaRegistryMinimalFunctionality
private static final int MESSAGE_COUNT = 100;
private static final Schema INITIAL_SCHEMA = SchemaBuilder.record(RECORD_NAME)
.fields()
.name("col_1").type().longType().noDefault()
.name("col_2").type().stringType().noDefault()
.name("col_1").type().nullable().longType().noDefault()
.name("col_2").type().nullable().stringType().noDefault()
.endRecord();
private static final Schema EVOLVED_SCHEMA = SchemaBuilder.record(RECORD_NAME)
.fields()
.name("col_1").type().longType().noDefault()
.name("col_2").type().stringType().noDefault()
.name("col_1").type().nullable().longType().noDefault()
.name("col_2").type().nullable().stringType().noDefault()
.name("col_3").type().optional().doubleType()
.endRecord();

Expand Down Expand Up @@ -114,6 +115,65 @@ public void testTopicWithKeySubject()
.buildOrThrow());
}

@Test
public void testTopicWithTombstone()
{
String topicName = "topic-tombstone-" + randomNameSuffix();

assertNotExists(topicName);

Map<String, String> producerConfig = schemaRegistryAwareProducer(testingKafka)
.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.buildOrThrow();

List<ProducerRecord<Long, GenericRecord>> messages = createMessages(topicName, 2, true);
testingKafka.sendMessages(messages.stream(), producerConfig);

// sending tombstone message (null value) for existing key,
// to be differentiated from simple null value message by message corrupted field
testingKafka.sendMessages(LongStream.of(1).mapToObj(id -> new ProducerRecord<>(topicName, id, null)), producerConfig);

waitUntilTableExists(topicName);

// tombstone message should have message corrupt field - true
QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner());
queryAssertions.query(format("SELECT \"%s-key\", col_1, col_2, _message_corrupt FROM %s", topicName, toDoubleQuoted(topicName)))
.assertThat()
.containsAll("VALUES (CAST(0 as bigint), CAST(0 as bigint), VARCHAR 'string-0', false), (CAST(1 as bigint), CAST(100 as bigint), VARCHAR 'string-1', false), (CAST(1 as bigint), null, null, true)");
}

@Test
public void testTopicWithAllNullValues()
{
String topicName = "topic-tombstone-" + randomNameSuffix();

assertNotExists(topicName);

Map<String, String> producerConfig = schemaRegistryAwareProducer(testingKafka)
.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName())
.buildOrThrow();

List<ProducerRecord<Long, GenericRecord>> messages = createMessages(topicName, 2, true);
testingKafka.sendMessages(messages.stream(), producerConfig);

// sending all null values for existing key,
// to be differentiated from tombstone by message corrupted field
testingKafka.sendMessages(LongStream.of(1).mapToObj(id -> new ProducerRecord<>(topicName, id, new GenericRecordBuilder(INITIAL_SCHEMA)
.set("col_1", null)
.set("col_2", null)
.build())), producerConfig);

waitUntilTableExists(topicName);

// simple all null values message should have message corrupt field - false
QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner());
queryAssertions.query(format("SELECT \"%s-key\", col_1, col_2, _message_corrupt FROM %s", topicName, toDoubleQuoted(topicName)))
.assertThat()
.containsAll("VALUES (CAST(0 as bigint), CAST(0 as bigint), VARCHAR 'string-0', false), (CAST(1 as bigint), CAST(100 as bigint), VARCHAR 'string-1', false), (CAST(1 as bigint), null, null, false)");
}

@Test
public void testTopicWithRecordNameStrategy()
{
Expand Down Expand Up @@ -264,10 +324,12 @@ private static void addExpectedColumns(Schema schema, GenericRecord record, Immu
throw new IllegalArgumentException("Unsupported field: " + field);
}
}
else if (field.schema().getType().equals(Schema.Type.STRING)) {
else if (field.schema().getType().equals(Schema.Type.STRING)
|| (field.schema().getType().equals(Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create(Schema.Type.STRING)))) {
columnsBuilder.add(format("VARCHAR '%s'", value));
}
else if (field.schema().getType().equals(Schema.Type.LONG)) {
else if (field.schema().getType().equals(Schema.Type.LONG)
|| (field.schema().getType().equals(Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create(Schema.Type.LONG)))) {
columnsBuilder.add(format("CAST(%s AS bigint)", value));
}
else {
Expand Down

0 comments on commit 9ae48c4

Please sign in to comment.