diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index 1781c2d6f0b6..9b8598fd6a89 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -416,11 +416,17 @@ public long executeUpdate(ConnectorSession session, QueryJobConfiguration job) } public TableResult executeQuery(ConnectorSession session, String sql) + { + return executeQuery(session, sql, null); + } + + public TableResult executeQuery(ConnectorSession session, String sql, Long maxResults) { log.debug("Execute query: %s", sql); QueryJobConfiguration job = QueryJobConfiguration.newBuilder(sql) .setUseQueryCache(isQueryResultsCacheEnabled(session)) .setCreateDisposition(createDisposition(session)) + .setMaxResults(maxResults) .build(); return execute(session, job); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java index c91bfde2de7e..71e19df49e9f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorFactory.java @@ -19,6 +19,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Tracer; import io.trino.spi.NodeManager; +import io.trino.spi.catalog.CatalogName; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; @@ -53,6 +54,7 @@ public Connector create(String catalogName, Map config, Connecto binder.bind(NodeManager.class).toInstance(context.getNodeManager()); binder.bind(OpenTelemetry.class).toInstance(context.getOpenTelemetry()); binder.bind(Tracer.class).toInstance(context.getTracer()); + binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); }); Injector injector = app diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java index 0f977c7597a7..e1b1373453c0 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java @@ -17,6 +17,7 @@ import com.google.api.gax.rpc.HeaderProvider; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Binder; +import com.google.inject.Key; import com.google.inject.Provides; import com.google.inject.Scopes; import com.google.inject.Singleton; @@ -29,11 +30,13 @@ import io.trino.plugin.bigquery.procedure.ExecuteProcedure; import io.trino.plugin.bigquery.ptf.Query; import io.trino.spi.NodeManager; +import io.trino.spi.catalog.CatalogName; import io.trino.spi.function.table.ConnectorTableFunction; import io.trino.spi.procedure.Procedure; import java.lang.management.ManagementFactory; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -45,6 +48,7 @@ import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.base.ClosingBinder.closingBinder; import static io.trino.plugin.bigquery.BigQueryConfig.ARROW_SERIALIZATION_ENABLED; +import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.stream.Collectors.toSet; @@ -106,6 +110,7 @@ protected void setup(Binder binder) })); closingBinder(binder).registerExecutor(ListeningExecutorService.class); + closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForBigQueryPageSource.class)); } @Provides @@ -129,6 +134,14 @@ public ListeningExecutorService provideListeningExecutor(BigQueryConfig config) return listeningDecorator(newFixedThreadPool(config.getMetadataParallelism(), daemonThreadsNamed("big-query-%s"))); // limit parallelism } + @Provides + @Singleton + @ForBigQueryPageSource + public ExecutorService provideExecutor(CatalogName catalogName) + { + return newCachedThreadPool(daemonThreadsNamed("bigquery-" + catalogName + "-%s")); + } + /** * Apache Arrow requires reflective access to certain Java internals prohibited since Java 17. * Adds an error to the {@code binder} if required --add-opens is not passed to the JVM. diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index b4af27b1038f..8340f253fdf1 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -43,19 +44,22 @@ public class BigQueryPageSourceProvider private final BigQueryTypeManager typeManager; private final int maxReadRowsRetries; private final boolean arrowSerializationEnabled; + private final ExecutorService executor; @Inject public BigQueryPageSourceProvider( BigQueryClientFactory bigQueryClientFactory, BigQueryReadClientFactory bigQueryReadClientFactory, BigQueryTypeManager typeManager, - BigQueryConfig config) + BigQueryConfig config, + @ForBigQueryPageSource ExecutorService executor) { this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory is null"); this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.maxReadRowsRetries = config.getMaxReadRowsRetries(); this.arrowSerializationEnabled = config.isArrowSerializationEnabled(); + this.executor = requireNonNull(executor, "executor is null"); } @Override @@ -106,12 +110,14 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi return new BigQueryStorageArrowPageSource( typeManager, bigQueryReadClientFactory.create(session), + executor, maxReadRowsRetries, split, columnHandles); } return new BigQueryStorageAvroPageSource( bigQueryReadClientFactory.create(session), + executor, typeManager, maxReadRowsRetries, split, @@ -124,6 +130,7 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ session, typeManager, bigQueryClientFactory.create(session), + executor, table, columnHandles, filter); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 75bfbb50ef8b..ce89b253a17d 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -44,9 +44,13 @@ import java.time.format.DateTimeFormatterBuilder; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.bigquery.BigQueryClient.selectSql; import static io.trino.plugin.bigquery.BigQueryTypeManager.toTrinoTimestamp; @@ -67,6 +71,7 @@ public class BigQueryQueryPageSource implements ConnectorPageSource { + private static final long MAX_PAGE_ROW_COUNT = 8192L; private static final DateTimeFormatter TIME_FORMATTER = new DateTimeFormatterBuilder() .appendPattern("HH:mm:ss") .optionalStart() @@ -78,23 +83,29 @@ public class BigQueryQueryPageSource private final List columnHandles; private final PageBuilder pageBuilder; private final boolean isQueryFunction; - private final TableResult tableResult; + private final AtomicLong readTimeNanos = new AtomicLong(); + + private CompletableFuture tableResultFuture; + private TableResult tableResult; private boolean finished; public BigQueryQueryPageSource( ConnectorSession session, BigQueryTypeManager typeManager, BigQueryClient client, + ExecutorService executor, BigQueryTableHandle table, List columnHandles, Optional filter) { + requireNonNull(session, "session is null"); + this.typeManager = requireNonNull(typeManager, "typeManager is null"); requireNonNull(client, "client is null"); + requireNonNull(executor, "executor is null"); requireNonNull(table, "table is null"); - requireNonNull(filter, "filter is null"); - this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.columnHandles = requireNonNull(columnHandles, "columnHandles is null"); + requireNonNull(filter, "filter is null"); this.pageBuilder = new PageBuilder(columnHandles.stream().map(BigQueryColumnHandle::trinoType).collect(toImmutableList())); this.isQueryFunction = table.relationHandle() instanceof BigQueryQueryRelationHandle; String sql = buildSql( @@ -102,7 +113,12 @@ public BigQueryQueryPageSource( client.getProjectId(), ImmutableList.copyOf(columnHandles), filter); - this.tableResult = client.executeQuery(session, sql); + this.tableResultFuture = CompletableFuture.supplyAsync(() -> { + long start = System.nanoTime(); + TableResult result = client.executeQuery(session, sql, MAX_PAGE_ROW_COUNT); + readTimeNanos.addAndGet(System.nanoTime() - start); + return result; + }, executor); } private String buildSql(BigQueryTableHandle table, String projectId, List columns, Optional filter) @@ -127,7 +143,7 @@ public long getCompletedBytes() @Override public long getReadTimeNanos() { - return 0; + return readTimeNanos.get(); } @Override @@ -146,7 +162,25 @@ public long getMemoryUsage() public Page getNextPage() { verify(pageBuilder.isEmpty()); - for (FieldValueList record : tableResult.iterateAll()) { + if (tableResult == null) { + tableResult = getFutureValue(tableResultFuture); + } + else if (tableResult.hasNextPage()) { + long start = System.nanoTime(); + tableResult = tableResult.getNextPage(); + readTimeNanos.addAndGet(System.nanoTime() - start); + } + else { + finished = true; + return null; + } + + long start = System.nanoTime(); + List values = ImmutableList.copyOf(tableResult.getValues()); + finished = !tableResult.hasNextPage(); + readTimeNanos.addAndGet(System.nanoTime() - start); + + for (FieldValueList record : values) { pageBuilder.declarePosition(); for (int column = 0; column < columnHandles.size(); column++) { BigQueryColumnHandle columnHandle = columnHandles.get(column); @@ -155,7 +189,6 @@ public Page getNextPage() appendTo(columnHandle.trinoType(), fieldValue, output); } } - finished = true; Page page = pageBuilder.build(); pageBuilder.reset(); @@ -255,5 +288,15 @@ else if (type instanceof VarbinaryType) { } @Override - public void close() {} + public void close() + { + tableResultFuture.cancel(true); + tableResult = null; + } + + @Override + public CompletableFuture isBlocked() + { + return tableResultFuture; + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index 712f437d51be..06508c6457a9 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -13,21 +13,10 @@ */ package io.trino.plugin.bigquery; -import com.google.cloud.bigquery.BigQueryException; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.TableInfo; -import com.google.cloud.bigquery.TableResult; -import com.google.cloud.bigquery.storage.v1.ReadSession; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.inject.Inject; -import com.google.protobuf.ByteString; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.spi.NodeManager; -import io.trino.spi.TrinoException; -import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; @@ -35,31 +24,8 @@ import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.DynamicFilter; -import io.trino.spi.connector.FixedSplitSource; -import io.trino.spi.predicate.TupleDomain; -import org.apache.arrow.vector.ipc.ReadChannel; -import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; - -import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; -import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW; -import static com.google.cloud.bigquery.TableDefinition.Type.VIEW; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.Iterables.getOnlyElement; -import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES; -import static io.trino.plugin.bigquery.BigQueryClient.selectSql; -import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization; -import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; -import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; -import static org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeSchema; public class BigQuerySplitManager implements ConnectorSplitManager @@ -99,152 +65,15 @@ public ConnectorSplitSource getSplits( Constraint constraint) { log.debug("getSplits(transaction=%s, session=%s, table=%s)", transaction, session, table); - BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) table; - - TupleDomain tableConstraint = bigQueryTableHandle.constraint(); - Optional filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint); - - if (bigQueryTableHandle.isQueryRelation()) { - BigQueryQueryRelationHandle bigQueryQueryRelationHandle = bigQueryTableHandle.getRequiredQueryRelation(); - List columns = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of()); - boolean useStorageApi = bigQueryQueryRelationHandle.isUseStorageApi(); - - // projectedColumnsNames can not be used for generating select sql because the query fails if it does not - // include a column name. eg: query => 'SELECT 1' - String query = filter - .map(whereClause -> "SELECT * FROM (" + bigQueryQueryRelationHandle.getQuery() + " ) WHERE " + whereClause) - .orElseGet(bigQueryQueryRelationHandle::getQuery); - - if (emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns())) { - String sql = "SELECT COUNT(*) FROM (" + query + ")"; - return new FixedSplitSource(createEmptyProjection(session, sql)); - } - - if (!useStorageApi) { - log.debug("Using Rest API for running query: %s", query); - return new FixedSplitSource(BigQuerySplit.forViewStream(columns, filter)); - } - - TableId destinationTable = bigQueryQueryRelationHandle.getDestinationTableName().toTableId(); - TableInfo tableInfo = new ViewMaterializationCache.DestinationTableBuilder(bigQueryClientFactory.create(session), viewExpiration, query, destinationTable).get(); - - log.debug("Using Storage API for running query: %s", query); - // filter is already used while constructing the select query - ReadSession readSession = createReadSession(session, tableInfo.getTableId(), ImmutableList.copyOf(columns), Optional.empty()); - return new FixedSplitSource(readSession.getStreamsList().stream() - .map(stream -> BigQuerySplit.forStream(stream.getName(), getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize()))) - .collect(toImmutableList())); - } - - TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId(); - TableDefinition.Type tableType = TableDefinition.Type.valueOf(bigQueryTableHandle.asPlainTable().getType()); - List splits = emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns()) - ? createEmptyProjection(session, tableType, remoteTableId, filter) - : readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint); - return new FixedSplitSource(splits); - } - - private static boolean emptyProjectionIsRequired(Optional> projectedColumns) - { - return projectedColumns.isPresent() && projectedColumns.get().isEmpty(); - } - - private List readFromBigQuery( - ConnectorSession session, - TableDefinition.Type type, - TableId remoteTableId, - Optional> projectedColumns, - TupleDomain tableConstraint) - { - checkArgument(projectedColumns.isPresent() && projectedColumns.get().size() > 0, "Projected column is empty"); - Optional filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint); - - log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, filter=[%s])", remoteTableId, projectedColumns, filter); - List columns = projectedColumns.get(); - List projectedColumnsNames = getProjectedColumnNames(columns); - ImmutableList.Builder projectedColumnHandles = ImmutableList.builder(); - projectedColumnHandles.addAll(columns); - - if (isWildcardTable(type, remoteTableId.getTable())) { - // Storage API doesn't support reading wildcard tables - return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); - } - if (type == EXTERNAL) { - // Storage API doesn't support reading external tables - return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); - } - if (type == VIEW || type == MATERIALIZED_VIEW) { - if (isSkipViewMaterialization(session)) { - return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); - } - tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream() - .map(BigQueryColumnHandle.class::cast) - .filter(column -> !projectedColumnsNames.contains(column.name())) - .forEach(projectedColumnHandles::add)); - } - ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(projectedColumnHandles.build()), filter); - - String schemaString = getSchemaAsString(readSession); - return readSession.getStreamsList().stream() - .map(stream -> BigQuerySplit.forStream(stream.getName(), schemaString, columns, OptionalInt.of(stream.getSerializedSize()))) - .collect(toImmutableList()); - } - - @VisibleForTesting - ReadSession createReadSession(ConnectorSession session, TableId remoteTableId, List columns, Optional filter) - { - ReadSessionCreator readSessionCreator = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, arrowSerializationEnabled, viewExpiration, maxReadRowsRetries); - return readSessionCreator.create(session, remoteTableId, columns, filter, nodeManager.getRequiredWorkerNodes().size()); - } - - private static List getProjectedColumnNames(List columns) - { - return columns.stream().map(BigQueryColumnHandle::name).collect(toImmutableList()); - } - - private List createEmptyProjection(ConnectorSession session, TableDefinition.Type tableType, TableId remoteTableId, Optional filter) - { - if (!TABLE_TYPES.containsKey(tableType)) { - throw new TrinoException(NOT_SUPPORTED, "Unsupported table type: " + tableType); - } - - // Note that we cannot use row count from TableInfo because for writes via insertAll/streaming API the number is incorrect until the streaming buffer is flushed - // (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections. - String sql = selectSql(remoteTableId, "COUNT(*)", filter); - return createEmptyProjection(session, sql); - } - - private List createEmptyProjection(ConnectorSession session, String sql) - { - BigQueryClient client = bigQueryClientFactory.create(session); - log.debug("createEmptyProjection(sql=%s)", sql); - try { - TableResult result = client.executeQuery(session, sql); - long numberOfRows = getOnlyElement(getOnlyElement(result.iterateAll())).getLongValue(); - - return ImmutableList.of(BigQuerySplit.emptyProjection(numberOfRows)); - } - catch (BigQueryException e) { - throw new TrinoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, "Failed to compute empty projection", e); - } - } - - private String getSchemaAsString(ReadSession readSession) - { - if (arrowSerializationEnabled) { - return deserializeArrowSchema(readSession.getArrowSchema().getSerializedSchema()); - } - return readSession.getAvroSchema().getSchema(); - } - - private static String deserializeArrowSchema(ByteString serializedSchema) - { - try { - return deserializeSchema(new ReadChannel(new ByteArrayReadableSeekableByteChannel(serializedSchema.toByteArray()))) - .toJson(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + return new BigQuerySplitSource( + session, + (BigQueryTableHandle) table, + bigQueryClientFactory, + bigQueryReadClientFactory, + viewEnabled, + arrowSerializationEnabled, + viewExpiration, + nodeManager, + maxReadRowsRetries); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java new file mode 100644 index 000000000000..0aa69c320302 --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java @@ -0,0 +1,283 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TableResult; +import com.google.cloud.bigquery.storage.v1.ReadSession; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.spi.NodeManager; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnHandle; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.ConnectorSplitSource; +import io.trino.spi.predicate.TupleDomain; +import jakarta.annotation.Nullable; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.concurrent.CompletableFuture; + +import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; +import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW; +import static com.google.cloud.bigquery.TableDefinition.Type.VIEW; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES; +import static io.trino.plugin.bigquery.BigQueryClient.selectSql; +import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization; +import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.arrow.vector.ipc.message.MessageSerializer.deserializeSchema; + +public class BigQuerySplitSource + implements ConnectorSplitSource +{ + private static final Logger log = Logger.get(BigQuerySplitSource.class); + + private final ConnectorSession session; + private final BigQueryTableHandle table; + private final BigQueryClientFactory bigQueryClientFactory; + private final BigQueryReadClientFactory bigQueryReadClientFactory; + private final boolean viewEnabled; + private final boolean arrowSerializationEnabled; + private final Duration viewExpiration; + private final NodeManager nodeManager; + private final int maxReadRowsRetries; + + @Nullable + private List splits; + private int offset; + + public BigQuerySplitSource(ConnectorSession session, + BigQueryTableHandle table, + BigQueryClientFactory bigQueryClientFactory, + BigQueryReadClientFactory bigQueryReadClientFactory, + boolean viewEnabled, + boolean arrowSerializationEnabled, + Duration viewExpiration, + NodeManager nodeManager, + int maxReadRowsRetries) + { + this.session = requireNonNull(session, "session is null"); + this.table = requireNonNull(table, "table is null"); + this.bigQueryClientFactory = requireNonNull(bigQueryClientFactory, "bigQueryClientFactory cannot be null"); + this.bigQueryReadClientFactory = requireNonNull(bigQueryReadClientFactory, "bigQueryReadClientFactory cannot be null"); + this.viewEnabled = viewEnabled; + this.arrowSerializationEnabled = arrowSerializationEnabled; + this.viewExpiration = requireNonNull(viewExpiration, "viewExpiration is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager cannot be null"); + this.maxReadRowsRetries = maxReadRowsRetries; + } + + @Override + public CompletableFuture getNextBatch(int maxSize) + { + if (splits == null) { + splits = getSplits(session, table); + } + + return completedFuture(new ConnectorSplitBatch(prepareNextBatch(maxSize), isFinished())); + } + + private List prepareNextBatch(int maxSize) + { + requireNonNull(splits, "splits is null"); + int nextOffset = Math.min(splits.size(), offset + maxSize); + List results = splits.subList(offset, nextOffset).stream() + .map(ConnectorSplit.class::cast) + .collect(toImmutableList()); + offset = nextOffset; + return results; + } + + @Override + public boolean isFinished() + { + return splits != null && offset >= splits.size(); + } + + @Override + public void close() + { + splits = null; + } + + private List getSplits( + ConnectorSession session, + BigQueryTableHandle bigQueryTableHandle) + { + TupleDomain tableConstraint = bigQueryTableHandle.constraint(); + Optional filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint); + + if (bigQueryTableHandle.isQueryRelation()) { + BigQueryQueryRelationHandle bigQueryQueryRelationHandle = bigQueryTableHandle.getRequiredQueryRelation(); + List columns = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of()); + boolean useStorageApi = bigQueryQueryRelationHandle.isUseStorageApi(); + + // projectedColumnsNames can not be used for generating select sql because the query fails if it does not + // include a column name. eg: query => 'SELECT 1' + String query = filter + .map(whereClause -> "SELECT * FROM (" + bigQueryQueryRelationHandle.getQuery() + " ) WHERE " + whereClause) + .orElseGet(bigQueryQueryRelationHandle::getQuery); + + if (emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns())) { + String sql = "SELECT COUNT(*) FROM (" + query + ")"; + return createEmptyProjection(session, sql); + } + + if (!useStorageApi) { + log.debug("Using Rest API for running query: %s", query); + return List.of(BigQuerySplit.forViewStream(columns, filter)); + } + + TableId destinationTable = bigQueryQueryRelationHandle.getDestinationTableName().toTableId(); + TableInfo tableInfo = new ViewMaterializationCache.DestinationTableBuilder(bigQueryClientFactory.create(session), viewExpiration, query, destinationTable).get(); + + log.debug("Using Storage API for running query: %s", query); + // filter is already used while constructing the select query + ReadSession readSession = createReadSession(session, tableInfo.getTableId(), ImmutableList.copyOf(columns), Optional.empty()); + return readSession.getStreamsList().stream() + .map(stream -> BigQuerySplit.forStream(stream.getName(), getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize()))) + .collect(toImmutableList()); + } + + TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId(); + TableDefinition.Type tableType = TableDefinition.Type.valueOf(bigQueryTableHandle.asPlainTable().getType()); + return emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns()) + ? createEmptyProjection(session, tableType, remoteTableId, filter) + : readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint); + } + + private static boolean emptyProjectionIsRequired(Optional> projectedColumns) + { + return projectedColumns.isPresent() && projectedColumns.get().isEmpty(); + } + + private List readFromBigQuery( + ConnectorSession session, + TableDefinition.Type type, + TableId remoteTableId, + Optional> projectedColumns, + TupleDomain tableConstraint) + { + checkArgument(projectedColumns.isPresent() && projectedColumns.get().size() > 0, "Projected column is empty"); + Optional filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint); + + log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, filter=[%s])", remoteTableId, projectedColumns, filter); + List columns = projectedColumns.get(); + List projectedColumnsNames = getProjectedColumnNames(columns); + ImmutableList.Builder projectedColumnHandles = ImmutableList.builder(); + projectedColumnHandles.addAll(columns); + + if (isWildcardTable(type, remoteTableId.getTable())) { + // Storage API doesn't support reading wildcard tables + return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); + } + if (type == EXTERNAL) { + // Storage API doesn't support reading external tables + return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); + } + if (type == VIEW || type == MATERIALIZED_VIEW) { + if (isSkipViewMaterialization(session)) { + return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter)); + } + tableConstraint.getDomains().ifPresent(domains -> domains.keySet().stream() + .map(BigQueryColumnHandle.class::cast) + .filter(column -> !projectedColumnsNames.contains(column.name())) + .forEach(projectedColumnHandles::add)); + } + ReadSession readSession = createReadSession(session, remoteTableId, ImmutableList.copyOf(projectedColumnHandles.build()), filter); + + String schemaString = getSchemaAsString(readSession); + return readSession.getStreamsList().stream() + .map(stream -> BigQuerySplit.forStream(stream.getName(), schemaString, columns, OptionalInt.of(stream.getSerializedSize()))) + .collect(toImmutableList()); + } + + @VisibleForTesting + ReadSession createReadSession(ConnectorSession session, TableId remoteTableId, List columns, Optional filter) + { + ReadSessionCreator readSessionCreator = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, arrowSerializationEnabled, viewExpiration, maxReadRowsRetries); + return readSessionCreator.create(session, remoteTableId, columns, filter, nodeManager.getRequiredWorkerNodes().size()); + } + + private static List getProjectedColumnNames(List columns) + { + return columns.stream().map(BigQueryColumnHandle::name).collect(toImmutableList()); + } + + private List createEmptyProjection(ConnectorSession session, TableDefinition.Type tableType, TableId remoteTableId, Optional filter) + { + if (!TABLE_TYPES.containsKey(tableType)) { + throw new TrinoException(NOT_SUPPORTED, "Unsupported table type: " + tableType); + } + + // Note that we cannot use row count from TableInfo because for writes via insertAll/streaming API the number is incorrect until the streaming buffer is flushed + // (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections. + String sql = selectSql(remoteTableId, "COUNT(*)", filter); + return createEmptyProjection(session, sql); + } + + private List createEmptyProjection(ConnectorSession session, String sql) + { + BigQueryClient client = bigQueryClientFactory.create(session); + log.debug("createEmptyProjection(sql=%s)", sql); + try { + TableResult result = client.executeQuery(session, sql); + long numberOfRows = getOnlyElement(getOnlyElement(result.iterateAll())).getLongValue(); + + return ImmutableList.of(BigQuerySplit.emptyProjection(numberOfRows)); + } + catch (BigQueryException e) { + throw new TrinoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, "Failed to compute empty projection", e); + } + } + + private String getSchemaAsString(ReadSession readSession) + { + if (arrowSerializationEnabled) { + return deserializeArrowSchema(readSession.getArrowSchema().getSerializedSchema()); + } + return readSession.getAvroSchema().getSchema(); + } + + private static String deserializeArrowSchema(ByteString serializedSchema) + { + try { + return deserializeSchema(new ReadChannel(new ByteArrayReadableSeekableByteChannel(serializedSchema.toByteArray()))) + .toJson(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java index 565008dcdf58..9235899ff08e 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageArrowPageSource.java @@ -32,10 +32,14 @@ import java.io.UncheckedIOException; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.concurrent.MoreFutures.getFutureValue; import static java.util.Objects.requireNonNull; public class BigQueryStorageArrowPageSource @@ -50,26 +54,35 @@ public class BigQueryStorageArrowPageSource .build()); private final AtomicLong readBytes = new AtomicLong(); + private final AtomicLong readTimeNanos = new AtomicLong(); private final BigQueryReadClient bigQueryReadClient; - private final BigQuerySplit split; - private final Iterator responses; + private final ExecutorService executor; + private final String streamName; private final BigQueryArrowToPageConverter bigQueryArrowToPageConverter; private final BufferAllocator streamBufferAllocator; private final PageBuilder pageBuilder; + private final Iterator responses; + + private CompletableFuture nextResponse; + private boolean finished; public BigQueryStorageArrowPageSource( BigQueryTypeManager typeManager, BigQueryReadClient bigQueryReadClient, + ExecutorService executor, int maxReadRowsRetries, BigQuerySplit split, List columns) { this.bigQueryReadClient = requireNonNull(bigQueryReadClient, "bigQueryReadClient is null"); - this.split = requireNonNull(split, "split is null"); + this.executor = requireNonNull(executor, "executor is null"); + requireNonNull(split, "split is null"); + this.streamName = split.getStreamName(); requireNonNull(columns, "columns is null"); Schema schema = deserializeSchema(split.getSchemaString()); log.debug("Starting to read from %s", split.getStreamName()); responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows(); + nextResponse = CompletableFuture.supplyAsync(this::getResponse, executor); this.streamBufferAllocator = allocator.newChildAllocator(split.getStreamName(), 1024, Long.MAX_VALUE); this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(typeManager, streamBufferAllocator, schema, columns); this.pageBuilder = new PageBuilder(columns.stream() @@ -86,40 +99,43 @@ public long getCompletedBytes() @Override public long getReadTimeNanos() { - return 0; + return readTimeNanos.get(); } @Override public boolean isFinished() { - return !responses.hasNext(); + return finished; } @Override public Page getNextPage() { checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page"); - if (!responses.hasNext()) { + ReadRowsResponse response; + try { + response = getFutureValue(nextResponse); + } + catch (NoSuchElementException ignored) { + finished = true; return null; } - ReadRowsResponse response = responses.next(); + nextResponse = CompletableFuture.supplyAsync(this::getResponse, executor); + long start = System.nanoTime(); try (ArrowRecordBatch batch = deserializeResponse(streamBufferAllocator, response)) { bigQueryArrowToPageConverter.convert(pageBuilder, batch); } Page page = pageBuilder.build(); pageBuilder.reset(); + readTimeNanos.addAndGet(System.nanoTime() - start); return page; } @Override public long getMemoryUsage() { - long memoryUsage = streamBufferAllocator.getAllocatedMemory(); - if (split.getDataSize().isPresent()) { - memoryUsage += split.getDataSize().getAsInt() + pageBuilder.getRetainedSizeInBytes(); - } - return memoryUsage; + return streamBufferAllocator.getAllocatedMemory() + pageBuilder.getRetainedSizeInBytes(); } @Override @@ -127,14 +143,29 @@ public void close() { streamBufferAllocator.close(); bigQueryArrowToPageConverter.close(); + nextResponse.cancel(true); bigQueryReadClient.close(); } + @Override + public CompletableFuture isBlocked() + { + return nextResponse; + } + + private ReadRowsResponse getResponse() + { + long start = System.nanoTime(); + ReadRowsResponse response = responses.next(); + readTimeNanos.addAndGet(System.nanoTime() - start); + return response; + } + private ArrowRecordBatch deserializeResponse(BufferAllocator allocator, ReadRowsResponse response) { int serializedSize = response.getArrowRecordBatch().getSerializedSize(); long totalReadSize = readBytes.addAndGet(serializedSize); - log.debug("Read %d bytes (total %d) from %s", serializedSize, totalReadSize, split.getStreamName()); + log.debug("Read %d bytes (total %d) from %s", serializedSize, totalReadSize, streamName); try { return MessageSerializer.deserializeRecordBatch(readChannelForByteString(response.getArrowRecordBatch().getSerializedRecordBatch()), allocator); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java index a24b2732b7a9..bc8cdb43b396 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryStorageAvroPageSource.java @@ -50,12 +50,16 @@ import java.nio.ByteBuffer; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.bigquery.BigQueryTypeManager.toTrinoTimestamp; import static io.trino.plugin.bigquery.BigQueryUtil.toBigQueryColumnName; @@ -85,31 +89,51 @@ public class BigQueryStorageAvroPageSource private static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter(); private final BigQueryReadClient bigQueryReadClient; + private final ExecutorService executor; private final BigQueryTypeManager typeManager; - private final BigQuerySplit split; + private final String streamName; + private final Schema avroSchema; private final List columns; - private final AtomicLong readBytes; + private final AtomicLong readBytes = new AtomicLong(); + private final AtomicLong readTimeNanos = new AtomicLong(); private final PageBuilder pageBuilder; private final Iterator responses; + private CompletableFuture nextResponse; + private boolean finished; + public BigQueryStorageAvroPageSource( BigQueryReadClient bigQueryReadClient, + ExecutorService executor, BigQueryTypeManager typeManager, int maxReadRowsRetries, BigQuerySplit split, List columns) { this.bigQueryReadClient = requireNonNull(bigQueryReadClient, "bigQueryReadClient is null"); + this.executor = requireNonNull(executor, "executor is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null"); - this.split = requireNonNull(split, "split is null"); - this.readBytes = new AtomicLong(); + requireNonNull(split, "split is null"); + this.streamName = split.getStreamName(); + this.avroSchema = parseSchema(split.getSchemaString()); this.columns = requireNonNull(columns, "columns is null"); this.pageBuilder = new PageBuilder(columns.stream() .map(BigQueryColumnHandle::trinoType) .collect(toImmutableList())); - log.debug("Starting to read from %s", split.getStreamName()); - responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows(); + log.debug("Starting to read from %s", streamName); + responses = new ReadRowsHelper(bigQueryReadClient, streamName, maxReadRowsRetries).readRows(); + nextResponse = CompletableFuture.supplyAsync(this::getResponse, executor); + } + + private Schema parseSchema(String schemaString) + { + try { + return new Schema.Parser().parse(schemaString); + } + catch (SchemaParseException e) { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid Avro schema: " + firstNonNull(e.getMessage(), e), e); + } } @Override @@ -121,23 +145,29 @@ public long getCompletedBytes() @Override public long getReadTimeNanos() { - return 0; + return readTimeNanos.get(); } @Override public boolean isFinished() { - return !responses.hasNext(); + return finished; } @Override public Page getNextPage() { checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page"); - if (!responses.hasNext()) { + ReadRowsResponse response; + try { + response = getFutureValue(nextResponse); + } + catch (NoSuchElementException ignored) { + finished = true; return null; } - ReadRowsResponse response = responses.next(); + nextResponse = CompletableFuture.supplyAsync(this::getResponse, executor); + long start = System.nanoTime(); Iterable records = parse(response); for (GenericRecord record : records) { pageBuilder.declarePosition(); @@ -150,6 +180,7 @@ public Page getNextPage() Page page = pageBuilder.build(); pageBuilder.reset(); + readTimeNanos.addAndGet(System.nanoTime() - start); return page; } @@ -295,31 +326,35 @@ private void writeRow(RowBlockBuilder output, RowType rowType, GenericRecord rec @Override public long getMemoryUsage() { - if (split.getDataSize().isPresent()) { - return split.getDataSize().getAsInt() + pageBuilder.getRetainedSizeInBytes(); - } - - return 0; + return pageBuilder.getRetainedSizeInBytes(); } @Override public void close() { + nextResponse.cancel(true); bigQueryReadClient.close(); } + @Override + public CompletableFuture isBlocked() + { + return nextResponse; + } + + private ReadRowsResponse getResponse() + { + long start = System.nanoTime(); + ReadRowsResponse response = responses.next(); + readTimeNanos.addAndGet(System.nanoTime() - start); + return response; + } + Iterable parse(ReadRowsResponse response) { byte[] buffer = response.getAvroRows().getSerializedBinaryRows().toByteArray(); readBytes.addAndGet(buffer.length); - log.debug("Read %d bytes (total %d) from %s", buffer.length, readBytes.get(), split.getStreamName()); - Schema avroSchema; - try { - avroSchema = new Schema.Parser().parse(split.getSchemaString()); - } - catch (SchemaParseException e) { - throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid Avro schema: " + firstNonNull(e.getMessage(), e), e); - } + log.debug("Read %d bytes (total %d) from %s", buffer.length, readBytes.get(), streamName); return () -> new AvroBinaryIterator(avroSchema, buffer); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ForBigQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ForBigQueryPageSource.java new file mode 100644 index 000000000000..80b3448b330a --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ForBigQueryPageSource.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@BindingAnnotation +public @interface ForBigQueryPageSource {} diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java index 2e0175d79aef..4b0625c85a43 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQuerySplitManager.java @@ -20,7 +20,10 @@ import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.Constraint; +import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; @@ -76,13 +79,13 @@ void testBigQueryMaterializedView() try { BigQueryTableHandle table = (BigQueryTableHandle) metadata.getTableHandle(session, new SchemaTableName("test", materializedView), Optional.empty(), Optional.empty()); - ReadSession readSession = createReadSession(session, table); + ReadSession readSession = createReadSession(transaction, session, table); assertThat(readSession.getTable()).contains(TEMP_TABLE_PREFIX); // Ignore constraints when creating temporary tables by default (view_materialization_with_filter is false) BigQueryColumnHandle column = new BigQueryColumnHandle("cnt", ImmutableList.of(), BIGINT, INT64, true, REQUIRED, ImmutableList.of(), null, false); BigQueryTableHandle tableDifferentFilter = new BigQueryTableHandle(table.relationHandle(), TupleDomain.fromFixedValues(ImmutableMap.of(column, new NullableValue(BIGINT, 0L))), table.projectedColumns()); - assertThat(createReadSession(session, tableDifferentFilter).getTable()) + assertThat(createReadSession(transaction, session, tableDifferentFilter).getTable()) .isEqualTo(readSession.getTable()); // Don't reuse the same temporary table when view_materialization_with_filter is true @@ -90,12 +93,12 @@ void testBigQueryMaterializedView() .setPropertyMetadata(new BigQuerySessionProperties(new BigQueryConfig()).getSessionProperties()) .setPropertyValues(ImmutableMap.of("view_materialization_with_filter", true)) .build(); - String temporaryTableWithFilter = createReadSession(viewMaterializationWithFilter, tableDifferentFilter).getTable(); + String temporaryTableWithFilter = createReadSession(transaction, viewMaterializationWithFilter, tableDifferentFilter).getTable(); assertThat(temporaryTableWithFilter) .isNotEqualTo(readSession.getTable()); // Reuse the same temporary table when the filters are identical - assertThat(createReadSession(viewMaterializationWithFilter, tableDifferentFilter).getTable()) + assertThat(createReadSession(transaction, viewMaterializationWithFilter, tableDifferentFilter).getTable()) .isEqualTo(temporaryTableWithFilter); } finally { @@ -103,14 +106,16 @@ void testBigQueryMaterializedView() } } - private ReadSession createReadSession(ConnectorSession session, BigQueryTableHandle table) + private ReadSession createReadSession(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table) { BigQuerySplitManager splitManager = (BigQuerySplitManager) connector.getSplitManager(); - return splitManager.createReadSession( + BigQuerySplitSource splitSource = (BigQuerySplitSource) splitManager.getSplits(transaction, session, table, DynamicFilter.EMPTY, Constraint.alwaysTrue()); + BigQueryTableHandle bigQueryTable = (BigQueryTableHandle) table; + return splitSource.createReadSession( session, - table.asPlainTable().getRemoteTableName().toTableId(), - table.projectedColumns().orElseThrow(), - buildFilter(table.constraint())); + bigQueryTable.asPlainTable().getRemoteTableName().toTableId(), + bigQueryTable.projectedColumns().orElseThrow(), + buildFilter(bigQueryTable.constraint())); } private void onBigQuery(@Language("SQL") String sql)