Skip to content

Commit

Permalink
[FIX][MAIN][avro.PulsarAvro*] Checkstyle violations
Browse files Browse the repository at this point in the history
  • Loading branch information
eaba committed Oct 5, 2024
1 parent 456c94b commit e0de330
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,27 @@
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.type.*;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.Int128;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.SmallintType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.Timestamps;
import io.trino.spi.type.TinyintType;
import io.trino.spi.type.Type;
import io.trino.spi.type.UuidType;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -55,7 +74,8 @@
* 2) support {@link RealType}.
* 3) support {@link DecimalType}.
*/
public class PulsarAvroColumnDecoder {
public class PulsarAvroColumnDecoder
{
private static final Set<Type> SUPPORTED_PRIMITIVE_TYPES = ImmutableSet.of(
BooleanType.BOOLEAN,
TinyintType.TINYINT,
Expand All @@ -74,7 +94,8 @@ public class PulsarAvroColumnDecoder {
private final String columnMapping;
private final String columnName;

public PulsarAvroColumnDecoder(DecoderColumnHandle columnHandle) {
public PulsarAvroColumnDecoder(DecoderColumnHandle columnHandle)
{
try {
requireNonNull(columnHandle, "columnHandle is null");
this.columnType = columnHandle.getType();
Expand All @@ -90,12 +111,14 @@ public PulsarAvroColumnDecoder(DecoderColumnHandle columnHandle) {
"mapping not defined for column '%s'", columnName);
checkArgument(isSupportedType(columnType),
"Unsupported column type '%s' for column '%s'", columnType, columnName);
} catch (IllegalArgumentException e) {
}
catch (IllegalArgumentException e) {
throw new TrinoException(GENERIC_USER_ERROR, e);
}
}

private static Object locateNode(GenericRecord element, String columnMapping) {
private static Object locateNode(GenericRecord element, String columnMapping)
{
Object value = element;
for (String pathElement : Splitter.on('/').omitEmptyStrings().split(columnMapping)) {
if (value == null) {
Expand All @@ -106,15 +129,17 @@ private static Object locateNode(GenericRecord element, String columnMapping) {
return value;
}

private static Slice getSlice(Object value, Type type, String columnName) {
private static Slice getSlice(Object value, Type type, String columnName)
{
if (type instanceof VarcharType && (value instanceof CharSequence || value instanceof GenericEnumSymbol)) {
return truncateToLength(utf8Slice(value.toString()), type);
}

if (type instanceof VarbinaryType) {
if (value instanceof ByteBuffer) {
return Slices.wrappedHeapBuffer((ByteBuffer) value);
} else if (value instanceof GenericFixed) {
}
else if (value instanceof GenericFixed) {
return Slices.wrappedBuffer(((GenericFixed) value).bytes());
}
}
Expand All @@ -128,7 +153,8 @@ private static Slice getSlice(Object value, Type type, String columnName) {
value.getClass(), type, columnName));
}

private static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName) {
private static Block serializeObject(BlockBuilder builder, Object value, Type type, String columnName)
{
if (type instanceof ArrayType) {
return serializeList(builder, value, type, columnName);
}
Expand All @@ -145,7 +171,8 @@ private static Block serializeObject(BlockBuilder builder, Object value, Type ty
return null;
}

private static Block serializeList(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) {
private static Block serializeList(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
if (value == null) {
checkState(parentBlockBuilder != null, "parentBlockBuilder is null");
parentBlockBuilder.appendNull();
Expand All @@ -167,11 +194,13 @@ private static Block serializeList(BlockBuilder parentBlockBuilder, Object value
}

private static Block serializeLongDecimal(
BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) {
BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
final BlockBuilder blockBuilder;
if (parentBlockBuilder != null) {
blockBuilder = parentBlockBuilder;
} else {
}
else {
blockBuilder = type.createBlockBuilder(null, 1);
}
final ByteBuffer buffer = (ByteBuffer) value;
Expand All @@ -182,7 +211,8 @@ private static Block serializeLongDecimal(
return null;
}

private static void serializePrimitive(BlockBuilder blockBuilder, Object value, Type type, String columnName) {
private static void serializePrimitive(BlockBuilder blockBuilder, Object value, Type type, String columnName)
{
requireNonNull(blockBuilder, "parent blockBuilder is null");

if (value == null) {
Expand Down Expand Up @@ -232,7 +262,8 @@ private static void serializePrimitive(BlockBuilder blockBuilder, Object value,
value.getClass(), type, columnName));
}

private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) {
private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
if (value == null) {
checkState(parentBlockBuilder != null, "parentBlockBuilder is null");
parentBlockBuilder.appendNull();
Expand All @@ -247,11 +278,12 @@ private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value,
BlockBuilder blockBuilder;
if (parentBlockBuilder != null) {
blockBuilder = parentBlockBuilder;
} else {
}
else {
blockBuilder = type.createBlockBuilder(null, 1);
}

BlockBuilder entryBuilder = blockBuilder.newBlockBuilderLike(blockBuilder.getPositionCount(), null);//.beginBlockEntry();
BlockBuilder entryBuilder = blockBuilder.newBlockBuilderLike(blockBuilder.getPositionCount(), null); //.beginBlockEntry();
for (Map.Entry<?, ?> entry : map.entrySet()) {
if (entry.getKey() != null) {
keyType.writeSlice(entryBuilder, truncateToLength(utf8Slice(entry.getKey().toString()), keyType));
Expand All @@ -261,12 +293,13 @@ private static Block serializeMap(BlockBuilder parentBlockBuilder, Object value,
//blockBuilder.closeEntry();

if (parentBlockBuilder == null) {
return blockBuilder.build();//.getObject(0, Block.class);
return blockBuilder.build(); //.getObject(0, Block.class);
}
return null;
}

private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName) {
private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value, Type type, String columnName)
{
if (value == null) {
checkState(parentBlockBuilder != null, "parent block builder is null");
parentBlockBuilder.appendNull();
Expand All @@ -276,10 +309,11 @@ private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value,
BlockBuilder blockBuilder;
if (parentBlockBuilder != null) {
blockBuilder = parentBlockBuilder;
} else {
}
else {
blockBuilder = type.createBlockBuilder(null, 1);
}
BlockBuilder singleRowBuilder = blockBuilder.newBlockBuilderLike(blockBuilder.getPositionCount(), null);//.beginBlockEntry();
BlockBuilder singleRowBuilder = blockBuilder.newBlockBuilderLike(blockBuilder.getPositionCount(), null); //.beginBlockEntry();
GenericRecord record = (GenericRecord) value;
List<Field> fields = ((RowType) type).getFields();
for (Field field : fields) {
Expand All @@ -288,12 +322,13 @@ private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value,
}
//blockBuilder.closeEntry();
if (parentBlockBuilder == null) {
return blockBuilder.build();//.getObject(0, Block.class);
return blockBuilder.build(); //.getObject(0, Block.class);
}
return null;
}

private boolean isSupportedType(Type type) {
private boolean isSupportedType(Type type)
{
if (isSupportedPrimitive(type)) {
return true;
}
Expand Down Expand Up @@ -324,34 +359,40 @@ private boolean isSupportedType(Type type) {
return false;
}

private boolean isSupportedPrimitive(Type type) {
private boolean isSupportedPrimitive(Type type)
{
return type instanceof VarcharType || type instanceof DecimalType || SUPPORTED_PRIMITIVE_TYPES.contains(type);
}

public FieldValueProvider decodeField(GenericRecord avroRecord) {
public FieldValueProvider decodeField(GenericRecord avroRecord)
{
Object avroColumnValue = locateNode(avroRecord, columnMapping);
return new ObjectValueProvider(avroColumnValue, columnType, columnName);
}

private static class ObjectValueProvider
extends FieldValueProvider {
extends FieldValueProvider
{
private final Object value;
private final Type columnType;
private final String columnName;

public ObjectValueProvider(Object value, Type columnType, String columnName) {
public ObjectValueProvider(Object value, Type columnType, String columnName)
{
this.value = value;
this.columnType = columnType;
this.columnName = columnName;
}

@Override
public boolean isNull() {
public boolean isNull()
{
return value == null;
}

@Override
public double getDouble() {
public double getDouble()
{
if (value instanceof Double || value instanceof Float) {
return ((Number) value).doubleValue();
}
Expand All @@ -361,7 +402,8 @@ public double getDouble() {
}

@Override
public boolean getBoolean() {
public boolean getBoolean()
{
if (value instanceof Boolean) {
return (Boolean) value;
}
Expand All @@ -371,7 +413,8 @@ public boolean getBoolean() {
}

@Override
public long getLong() {
public long getLong()
{
if (value instanceof Long || value instanceof Integer) {
final long payload = ((Number) value).longValue();
if (TimestampType.TIMESTAMP_MILLIS.equals(columnType)) {
Expand Down Expand Up @@ -400,12 +443,14 @@ public long getLong() {
}

@Override
public Slice getSlice() {
public Slice getSlice()
{
return PulsarAvroColumnDecoder.getSlice(value, columnType, columnName);
}

@Override
public Object getObject() {
public Object getObject()
{
return serializeObject(null, value, columnType, columnName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,37 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.util.Objects.requireNonNull;

public class PulsarAvroRowDecoder implements PulsarRowDecoder {

public class PulsarAvroRowDecoder
implements PulsarRowDecoder
{
private final GenericAvroSchema genericAvroSchema;
private final Map<DecoderColumnHandle, PulsarAvroColumnDecoder> columnDecoders;

public PulsarAvroRowDecoder(GenericAvroSchema genericAvroSchema, Set<DecoderColumnHandle> columns) {
public PulsarAvroRowDecoder(GenericAvroSchema genericAvroSchema, Set<DecoderColumnHandle> columns)
{
this.genericAvroSchema = requireNonNull(genericAvroSchema, "genericAvroSchema is null");
columnDecoders = columns.stream()
.collect(toImmutableMap(identity(), this::createColumnDecoder));
}

private PulsarAvroColumnDecoder createColumnDecoder(DecoderColumnHandle columnHandle) {
private PulsarAvroColumnDecoder createColumnDecoder(DecoderColumnHandle columnHandle)
{
return new PulsarAvroColumnDecoder(columnHandle);
}

/**
* decode ByteBuf by {@link org.apache.pulsar.client.api.schema.GenericSchema}.
*
* @param byteBuf
* @return
*/
@Override
public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(ByteBuf byteBuf) {
public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(ByteBuf byteBuf)
{
GenericRecord avroRecord;
try {
GenericAvroRecord record = (GenericAvroRecord) genericAvroSchema.decode(byteBuf.array());
avroRecord = record.getAvroRecord();
} catch (Exception e) {
}
catch (Exception e) {
e.printStackTrace();
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Decoding avro record failed.", e);
}
Expand All @@ -67,5 +70,4 @@ public Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodeRow(ByteBuf
Map.Entry::getKey,
entry -> entry.getValue().decodeField(avroRecord))));
}

}
Loading

0 comments on commit e0de330

Please sign in to comment.