diff --git a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java index f74948922ce..2ad60769590 100644 --- a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java +++ b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/KafkaRecordSet.java @@ -171,7 +171,8 @@ private boolean nextRow(ConsumerRecord message) long timeStamp = message.timestamp() * MICROSECONDS_PER_MILLISECOND; Optional> decodedKey = keyDecoder.decodeRow(keyData); - Optional> decodedValue = messageDecoder.decodeRow(messageData); + // tombstone message has null value body + Optional> decodedValue = message.value() == null ? Optional.empty() : messageDecoder.decodeRow(messageData); Map currentRowValuesMap = columnHandles.stream() .filter(KafkaColumnHandle::isInternal) diff --git a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java index 71d1db23e70..a8df72849ec 100644 --- a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java +++ b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.stream.IntStream; +import java.util.stream.LongStream; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG; import static io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY; @@ -62,13 +63,13 @@ public class TestKafkaWithConfluentSchemaRegistryMinimalFunctionality private static final int MESSAGE_COUNT = 100; private static final Schema INITIAL_SCHEMA = SchemaBuilder.record(RECORD_NAME) .fields() - .name("col_1").type().longType().noDefault() - .name("col_2").type().stringType().noDefault() + .name("col_1").type().nullable().longType().noDefault() + .name("col_2").type().nullable().stringType().noDefault() .endRecord(); private static final Schema EVOLVED_SCHEMA = SchemaBuilder.record(RECORD_NAME) .fields() - .name("col_1").type().longType().noDefault() - .name("col_2").type().stringType().noDefault() + .name("col_1").type().nullable().longType().noDefault() + .name("col_2").type().nullable().stringType().noDefault() .name("col_3").type().optional().doubleType() .endRecord(); @@ -114,6 +115,65 @@ public void testTopicWithKeySubject() .buildOrThrow()); } + @Test + public void testTopicWithTombstone() + { + String topicName = "topic-tombstone-" + randomNameSuffix(); + + assertNotExists(topicName); + + Map producerConfig = schemaRegistryAwareProducer(testingKafka) + .put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .buildOrThrow(); + + List> messages = createMessages(topicName, 2, true); + testingKafka.sendMessages(messages.stream(), producerConfig); + + // sending tombstone message (null value) for existing key, + // to be differentiated from simple null value message by message corrupted field + testingKafka.sendMessages(LongStream.of(1).mapToObj(id -> new ProducerRecord<>(topicName, id, null)), producerConfig); + + waitUntilTableExists(topicName); + + // tombstone message should have message corrupt field - true + QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner()); + queryAssertions.query(format("SELECT \"%s-key\", col_1, col_2, _message_corrupt FROM %s", topicName, toDoubleQuoted(topicName))) + .assertThat() + .containsAll("VALUES (CAST(0 as bigint), CAST(0 as bigint), VARCHAR 'string-0', false), (CAST(1 as bigint), CAST(100 as bigint), VARCHAR 'string-1', false), (CAST(1 as bigint), null, null, true)"); + } + + @Test + public void testTopicWithAllNullValues() + { + String topicName = "topic-tombstone-" + randomNameSuffix(); + + assertNotExists(topicName); + + Map producerConfig = schemaRegistryAwareProducer(testingKafka) + .put(KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()) + .buildOrThrow(); + + List> messages = createMessages(topicName, 2, true); + testingKafka.sendMessages(messages.stream(), producerConfig); + + // sending all null values for existing key, + // to be differentiated from tombstone by message corrupted field + testingKafka.sendMessages(LongStream.of(1).mapToObj(id -> new ProducerRecord<>(topicName, id, new GenericRecordBuilder(INITIAL_SCHEMA) + .set("col_1", null) + .set("col_2", null) + .build())), producerConfig); + + waitUntilTableExists(topicName); + + // simple all null values message should have message corrupt field - false + QueryAssertions queryAssertions = new QueryAssertions(getQueryRunner()); + queryAssertions.query(format("SELECT \"%s-key\", col_1, col_2, _message_corrupt FROM %s", topicName, toDoubleQuoted(topicName))) + .assertThat() + .containsAll("VALUES (CAST(0 as bigint), CAST(0 as bigint), VARCHAR 'string-0', false), (CAST(1 as bigint), CAST(100 as bigint), VARCHAR 'string-1', false), (CAST(1 as bigint), null, null, false)"); + } + @Test public void testTopicWithRecordNameStrategy() { @@ -264,10 +324,12 @@ private static void addExpectedColumns(Schema schema, GenericRecord record, Immu throw new IllegalArgumentException("Unsupported field: " + field); } } - else if (field.schema().getType().equals(Schema.Type.STRING)) { + else if (field.schema().getType().equals(Schema.Type.STRING) + || (field.schema().getType().equals(Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create(Schema.Type.STRING)))) { columnsBuilder.add(format("VARCHAR '%s'", value)); } - else if (field.schema().getType().equals(Schema.Type.LONG)) { + else if (field.schema().getType().equals(Schema.Type.LONG) + || (field.schema().getType().equals(Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create(Schema.Type.LONG)))) { columnsBuilder.add(format("CAST(%s AS bigint)", value)); } else {