Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add projection push down for STRUCT field in big-query connector #23443

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.bigquery;

import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;
import io.trino.spi.PageBuilder;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -82,18 +83,12 @@ public class BigQueryArrowToPageConverter
private final BigQueryTypeManager typeManager;
private final VectorSchemaRoot root;
private final VectorLoader loader;
private final List<Type> columnTypes;
private final List<String> columnNames;
private final List<BigQueryColumnHandle> columns;

public BigQueryArrowToPageConverter(BigQueryTypeManager typeManager, BufferAllocator allocator, Schema schema, List<BigQueryColumnHandle> columns)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.columnTypes = requireNonNull(columns, "columns is null").stream()
.map(BigQueryColumnHandle::trinoType)
.collect(toImmutableList());
this.columnNames = columns.stream()
.map(BigQueryColumnHandle::name)
.collect(toImmutableList());
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns is null"));
List<FieldVector> vectors = schema.getFields().stream()
.map(field -> field.createVector(allocator))
.collect(toImmutableList());
Expand All @@ -106,17 +101,34 @@ public void convert(PageBuilder pageBuilder, ArrowRecordBatch batch)
loader.load(batch);
pageBuilder.declarePositions(root.getRowCount());

for (int column = 0; column < columnTypes.size(); column++) {
for (int column = 0; column < columns.size(); column++) {
BigQueryColumnHandle columnHandle = columns.get(column);
FieldVector fieldVector = getFieldVector(root, columnHandle);
convertType(pageBuilder.getBlockBuilder(column),
columnTypes.get(column),
root.getVector(toBigQueryColumnName(columnNames.get(column))),
columnHandle.trinoType(),
fieldVector,
0,
root.getVector(toBigQueryColumnName(columnNames.get(column))).getValueCount());
fieldVector.getValueCount());
}

root.clear();
}

private static FieldVector getFieldVector(VectorSchemaRoot root, BigQueryColumnHandle columnHandle)
{
FieldVector fieldVector = root.getVector(toBigQueryColumnName(columnHandle.name()));

for (String dereferenceName : columnHandle.dereferenceNames()) {
for (FieldVector child : fieldVector.getChildrenFromFields()) {
if (child.getField().getName().equals(dereferenceName)) {
fieldVector = child;
break;
}
}
}
return fieldVector;
}

private void convertType(BlockBuilder output, Type type, FieldVector vector, int offset, int length)
{
Class<?> javaType = type.getJavaType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.base.Joiner;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
Expand Down Expand Up @@ -246,7 +247,7 @@ public Optional<TableInfo> getTable(TableId remoteTableId)
}
}

public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<String> requiredColumns, Optional<String> filter)
public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<BigQueryColumnHandle> requiredColumns, Optional<String> filter)
{
String query = selectSql(remoteTableId.getTableId(), requiredColumns, filter);
log.debug("query is %s", query);
Expand Down Expand Up @@ -466,10 +467,19 @@ public TableId getDestinationTable(String sql)
return requireNonNull(((QueryJobConfiguration) jobConfiguration).getDestinationTable(), "Cannot determine destination table for query");
}

public static String selectSql(TableId table, List<String> requiredColumns, Optional<String> filter)
public static String selectSql(TableId table, List<BigQueryColumnHandle> requiredColumns, Optional<String> filter)
{
String columns = requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));
return selectSql(table, columns, filter);
return selectSql(table,
requiredColumns.stream()
.map(column -> Joiner.on('.')
.join(ImmutableList.<String>builder()
.add(format("`%s`", column.name()))
.addAll(column.dereferenceNames().stream()
.map(dereferenceName -> format("`%s`", dereferenceName))
.collect(toImmutableList()))
.build()))
.collect(joining(",")),
filter);
}

public static String selectSql(TableId table, String formattedColumns, Optional<String> filter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -30,6 +31,7 @@

public record BigQueryColumnHandle(
String name,
List<String> dereferenceNames,
Type trinoType,
StandardSQLTypeName bigqueryType,
boolean isPushdownSupported,
Expand All @@ -44,6 +46,7 @@ public record BigQueryColumnHandle(
public BigQueryColumnHandle
{
requireNonNull(name, "name is null");
dereferenceNames = ImmutableList.copyOf(requireNonNull(dereferenceNames, "dereferenceNames is null"));
requireNonNull(trinoType, "trinoType is null");
requireNonNull(bigqueryType, "bigqueryType is null");
requireNonNull(mode, "mode is null");
Expand All @@ -62,6 +65,16 @@ public ColumnMetadata getColumnMetadata()
.build();
}

@JsonIgnore
public String getQualifiedName()
{
return Joiner.on('.')
.join(ImmutableList.<String>builder()
.add(name)
.addAll(dereferenceNames)
.build());
}

@JsonIgnore
public long getRetainedSizeInBytes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class BigQueryConfig
private String queryLabelName;
private String queryLabelFormat;
private boolean proxyEnabled;
private boolean projectionPushDownEnabled = true;
private int metadataParallelism = 2;

public Optional<String> getProjectId()
Expand Down Expand Up @@ -342,6 +343,19 @@ public BigQueryConfig setProxyEnabled(boolean proxyEnabled)
return this;
}

public boolean isProjectionPushdownEnabled()
{
return projectionPushDownEnabled;
}

@Config("bigquery.projection-pushdown-enabled")
@ConfigDescription("Dereference push down for ROW type")
public BigQueryConfig setProjectionPushdownEnabled(boolean projectionPushDownEnabled)
{
this.projectionPushDownEnabled = projectionPushDownEnabled;
return this;
}

@Min(1)
@Max(32)
public int getMetadataParallelism()
Expand Down
Loading