Skip to content

Commit

Permalink
Update BigQueryStorageAvroPageSource to SqlRow
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Oct 13, 2023
1 parent 17005ae commit 4324764
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
*/
package io.trino.plugin.bigquery;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.type.ArrayType;
Expand All @@ -28,7 +26,6 @@
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.arrow.memory.ArrowBuf;
Expand Down Expand Up @@ -57,10 +54,8 @@
import java.util.List;
import java.util.function.Consumer;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.bigquery.BigQueryUtil.toBigQueryColumnName;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
Expand All @@ -83,8 +78,6 @@
import static org.apache.arrow.compression.CommonsCompressionFactory.INSTANCE;
import static org.apache.arrow.vector.complex.BaseRepeatedValueVector.OFFSET_WIDTH;
import static org.apache.arrow.vector.types.Types.MinorType.DECIMAL256;
import static org.apache.arrow.vector.types.Types.MinorType.LIST;
import static org.apache.arrow.vector.types.Types.MinorType.STRUCT;

public class BigQueryArrowToPageConverter
implements AutoCloseable
Expand Down Expand Up @@ -170,8 +163,11 @@ else if (javaType == Slice.class) {
else if (javaType == LongTimestampWithTimeZone.class) {
writeVectorValues(output, vector, index -> writeObjectTimestampWithTimezone(output, type, vector, index), offset, length);
}
else if (javaType == Block.class) {
writeVectorValues(output, vector, index -> writeBlock(output, type, vector, index), offset, length);
else if (type instanceof ArrayType arrayType) {
writeVectorValues(output, vector, index -> writeArrayBlock(output, arrayType, vector, index), offset, length);
}
else if (type instanceof RowType rowType) {
writeVectorValues(output, vector, index -> writeRowBlock(output, rowType, vector, index), offset, length);
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
Expand Down Expand Up @@ -237,24 +233,10 @@ private void writeObjectTimestampWithTimezone(BlockBuilder output, Type type, Fi
type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY));
}

private void writeBlock(BlockBuilder output, Type type, FieldVector vector, int index)
{
if (type instanceof ArrayType && vector.getMinorType() == LIST) {
writeArrayBlock(output, type, vector, index);
return;
}
if (type instanceof RowType && vector.getMinorType() == STRUCT) {
writeRowBlock(output, type, vector, index);
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}

private void writeArrayBlock(BlockBuilder output, Type type, FieldVector vector, int index)
private void writeArrayBlock(BlockBuilder output, ArrayType arrayType, FieldVector vector, int index)
{
Type elementType = arrayType.getElementType();
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
Type elementType = getOnlyElement(type.getTypeParameters());

ArrowBuf offsetBuffer = vector.getOffsetBuffer();

int start = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
Expand All @@ -270,20 +252,14 @@ private void writeArrayBlock(BlockBuilder output, Type type, FieldVector vector,
});
}

private void writeRowBlock(BlockBuilder output, Type type, FieldVector vector, int index)
private void writeRowBlock(BlockBuilder output, RowType rowType, FieldVector vector, int index)
{
List<RowType.Field> fields = rowType.getFields();
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
ImmutableList.Builder<String> fieldNamesBuilder = ImmutableList.builder();
for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) {
TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i);
fieldNamesBuilder.add(parameter.getNamedTypeSignature().getName().orElse("field" + i));
}
List<String> fieldNames = fieldNamesBuilder.build();
checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldNames size differs from type %s type parameters size", type);

for (int i = 0; i < type.getTypeParameters().size(); i++) {
FieldVector innerVector = ((StructVector) vector).getChild(fieldNames.get(i));
convertType(fieldBuilders.get(i), type.getTypeParameters().get(i), innerVector, index, 1);
for (int i = 0; i < fields.size(); i++) {
RowType.Field field = fields.get(i);
FieldVector innerVector = ((StructVector) vector).getChild(field.getName().orElse("field" + i));
convertType(fieldBuilders.get(i), field.getType(), innerVector, index, 1);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
Expand Down Expand Up @@ -208,8 +207,22 @@ else if (type.getJavaType() == Int128.class) {
else if (javaType == Slice.class) {
writeSlice(output, type, value);
}
else if (javaType == Block.class) {
writeBlock(output, type, value);
else if (type instanceof ArrayType arrayType) {
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
Type elementType = arrayType.getElementType();
for (FieldValue element : value.getRepeatedValue()) {
appendTo(elementType, element, elementBuilder);
}
});
}
else if (type instanceof RowType rowType) {
FieldValueList record = value.getRecordValue();
List<RowType.Field> fields = rowType.getFields();
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
for (int index = 0; index < fields.size(); index++) {
appendTo(fields.get(index).getType(), record.get(index), fieldBuilders.get(index));
}
});
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
Expand All @@ -233,28 +246,6 @@ else if (type instanceof VarbinaryType) {
}
}

private void writeBlock(BlockBuilder output, Type type, FieldValue value)
{
if (type instanceof ArrayType) {
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
for (FieldValue element : value.getRepeatedValue()) {
appendTo(type.getTypeParameters().get(0), element, elementBuilder);
}
});
return;
}
if (type instanceof RowType) {
FieldValueList record = value.getRecordValue();
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
for (int index = 0; index < type.getTypeParameters().size(); index++) {
appendTo(type.getTypeParameters().get(index), record.get(index), fieldBuilders.get(index));
}
});
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
Expand All @@ -32,8 +31,8 @@
import io.trino.spi.type.Int128;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.RowType;
import io.trino.spi.type.RowType.Field;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeSignatureParameter;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.avro.Conversions.DecimalConversion;
Expand All @@ -48,7 +47,6 @@
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -206,8 +204,11 @@ else if (javaType == LongTimestampWithTimeZone.class) {
int picosOfMillis = toIntExact(floorMod(epochMicros, MICROSECONDS_PER_MILLISECOND)) * PICOSECONDS_PER_MICROSECOND;
type.writeObject(output, fromEpochMillisAndFraction(floorDiv(epochMicros, MICROSECONDS_PER_MILLISECOND), picosOfMillis, UTC_KEY));
}
else if (javaType == Block.class) {
writeBlock(output, type, value);
else if (type instanceof ArrayType arrayType) {
writeArray((ArrayBlockBuilder) output, (List<?>) value, arrayType);
}
else if (type instanceof RowType rowType) {
writeRow((RowBlockBuilder) output, rowType, (GenericRecord) value);
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
Expand Down Expand Up @@ -249,31 +250,25 @@ private static void writeObject(BlockBuilder output, Type type, Object value)
}
}

private void writeBlock(BlockBuilder output, Type type, Object value)
private void writeArray(ArrayBlockBuilder output, List<?> value, ArrayType arrayType)
{
if (type instanceof ArrayType && value instanceof List<?>) {
((ArrayBlockBuilder) output).buildEntry(elementBuilder -> {
for (Object element : (List<?>) value) {
appendTo(type.getTypeParameters().get(0), element, elementBuilder);
}
});
return;
}
if (type instanceof RowType && value instanceof GenericRecord record) {
((RowBlockBuilder) output).buildEntry(fieldBuilders -> {
List<String> fieldNames = new ArrayList<>();
for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) {
TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i);
fieldNames.add(parameter.getNamedTypeSignature().getName().orElse("field" + i));
}
checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldName doesn't match with type size : %s", type);
for (int index = 0; index < type.getTypeParameters().size(); index++) {
appendTo(type.getTypeParameters().get(index), record.get(fieldNames.get(index)), fieldBuilders.get(index));
}
});
return;
}
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
Type elementType = arrayType.getElementType();
output.buildEntry(elementBuilder -> {
for (Object element : value) {
appendTo(elementType, element, elementBuilder);
}
});
}

private void writeRow(RowBlockBuilder output, RowType rowType, GenericRecord record)
{
List<Field> fields = rowType.getFields();
output.buildEntry(fieldBuilders -> {
for (int index = 0; index < fields.size(); index++) {
Field field = fields.get(index);
appendTo(field.getType(), record.get(field.getName().orElse("field" + index)), fieldBuilders.get(index));
}
});
}

@Override
Expand Down

0 comments on commit 4324764

Please sign in to comment.