Skip to content

Commit

Permalink
Catch decode error in Avro decoder
Browse files Browse the repository at this point in the history
We should not fail query in case of any problems,
during decode phase of tombstone messages
  • Loading branch information
vlad-lyutenko committed May 17, 2023
1 parent 9054f64 commit 54bb219
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ public GenericRecordRowDecoder(AvroDeserializer<GenericRecord> deserializer, Set
@Override
public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(byte[] data)
{
GenericRecord avroRecord = deserializer.deserialize(data);
GenericRecord avroRecord;
try {
avroRecord = deserializer.deserialize(data);
}
catch (RuntimeException e) {
return Optional.empty();
}
return Optional.of(columnDecoders.stream()
.collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().decodeField(avroRecord))));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ public SingleValueRowDecoder(AvroDeserializer<Object> deserializer, DecoderColum
@Override
public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(byte[] data)
{
Object avroValue = deserializer.deserialize(data);
Object avroValue;
try {
avroValue = deserializer.deserialize(data);
}
catch (RuntimeException e) {
return Optional.empty();
}
return Optional.of(ImmutableMap.of(column, new AvroColumnDecoder.ObjectValueProvider(avroValue, column.getType(), column.getName())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.SchemaBuilder.FieldAssembler;
Expand Down Expand Up @@ -166,7 +165,7 @@ private static Map<DecoderColumnHandle, FieldValueProvider> decodeRow(byte[] avr
{
RowDecoder rowDecoder = DECODER_FACTORY.create(dataParams, columns);
return rowDecoder.decodeRow(avroData)
.orElseThrow(AssertionError::new);
.orElseThrow(() -> new IllegalStateException("Problems during decode phase"));
}

private static byte[] buildAvroData(Schema schema, String name, Object value)
Expand Down Expand Up @@ -364,10 +363,8 @@ public void testSchemaEvolutionToIncompatibleType()
.toString();

assertThatThrownBy(() -> decodeRow(originalIntData, ImmutableSet.of(stringColumnReadingIntData), ImmutableMap.of(DATA_SCHEMA, changedTypeSchema)))
.isInstanceOf(TrinoException.class)
.hasCauseExactlyInstanceOf(AvroTypeException.class)
.hasStackTraceContaining("Found int, expecting string")
.hasMessageMatching("Decoding Avro record failed.");
.isInstanceOf(IllegalStateException.class)
.hasMessageMatching("Problems during decode phase");
}

@Test
Expand Down

0 comments on commit 54bb219

Please sign in to comment.