diff --git a/pom.xml b/pom.xml
index 0614d0e2317..7bb3bde38b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
presto-elasticsearch
presto-iceberg
presto-google-sheets
+ presto-bigquery
diff --git a/presto-bigquery/pom.xml b/presto-bigquery/pom.xml
new file mode 100644
index 00000000000..fcb89eb8c4b
--- /dev/null
+++ b/presto-bigquery/pom.xml
@@ -0,0 +1,287 @@
+
+
+ 4.0.0
+
+
+ io.prestosql
+ presto-root
+ 331-SNAPSHOT
+
+
+ presto-bigquery
+ Presto - BigQuery Connector
+ presto-plugin
+
+
+ ${project.parent.basedir}
+ 1.49.1
+ 0.18.0
+ 1.91.3
+
+
+
+
+
+ com.google.api.grpc
+ proto-google-common-protos
+ 1.17.0
+
+
+
+ io.grpc
+ grpc-bom
+ 1.24.1
+ pom
+ import
+
+
+
+
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ bootstrap
+
+
+
+ io.airlift
+ json
+
+
+
+ io.airlift
+ log
+
+
+
+ io.airlift
+ log-manager
+
+
+
+ com.google.api
+ gax
+ ${dep.gax.version}
+
+
+
+ com.google.api
+ gax-grpc
+ ${dep.gax.version}
+
+
+
+ io.grpc
+ grpc-api
+
+
+
+ com.google.auth
+ google-auth-library-credentials
+ ${dep.google-auth-library.version}
+
+
+
+ com.google.auth
+ google-auth-library-oauth2-http
+ ${dep.google-auth-library.version}
+
+
+
+ com.google.cloud
+ google-cloud-core
+ ${dep.google-cloud-core.version}
+
+
+
+ com.google.cloud
+ google-cloud-core-http
+ ${dep.google-cloud-core.version}
+
+
+
+ com.google.http-client
+ google-http-client
+ 1.32.1
+
+
+ commons-logging
+ commons-logging
+
+
+
+
+
+ com.google.api.grpc
+ proto-google-cloud-bigquerystorage-v1beta1
+ 0.84.0
+
+
+
+ com.google.cloud
+ google-cloud-bigquery
+ 1.101.0
+
+
+ com.google.guava
+ guava
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ commons-logging
+ commons-logging
+
+
+
+
+
+ com.google.cloud
+ google-cloud-bigquerystorage
+ 0.119.0-beta
+
+
+ com.google.guava
+ guava
+
+
+
+ commons-logging
+ commons-logging
+
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.10.0
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+
+
+
+ javax.validation
+ validation-api
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ org.apache.avro
+ avro
+
+
+
+
+ io.prestosql
+ presto-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.mockito
+ mockito-core
+ 3.2.4
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ io.prestosql
+ presto-main
+ test
+
+
+
+ io.prestosql
+ presto-tpch
+ test
+
+
+
+ io.prestosql
+ presto-testing
+ test
+
+
+
+ io.prestosql
+ presto-tests
+ test
+
+
+
+ io.prestosql
+ presto-client
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+
+
+ **/TestBigQueryIntegrationSmokeTest.java
+
+
+
+
+
+
+
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryClient.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryClient.java
new file mode 100644
index 00000000000..4766e92f93d
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryClient.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetId;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.TableResult;
+import com.google.cloud.http.BaseHttpServiceException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.lang.String.format;
+import static java.util.Locale.ENGLISH;
+import static java.util.UUID.randomUUID;
+import static java.util.stream.Collectors.joining;
+
+// holds caches and mappings
+// presto converts the dataset and table names to lower case, while BigQuery is case sensitive
+// the mappings here keep the mappings
+class BigQueryClient
+{
+ private final BigQuery bigQuery;
+ private final Optional viewMaterializationProject;
+ private final Optional viewMaterializationDataset;
+ private final ConcurrentMap tableIds = new ConcurrentHashMap<>();
+ private final ConcurrentMap datasetIds = new ConcurrentHashMap<>();
+
+ BigQueryClient(BigQuery bigQuery, BigQueryConfig config)
+ {
+ this.bigQuery = bigQuery;
+ this.viewMaterializationProject = config.getViewMaterializationProject();
+ this.viewMaterializationDataset = config.getViewMaterializationDataset();
+ }
+
+ // return empty if no filters are used
+ private static Optional createWhereClause(String[] filters)
+ {
+ return Optional.empty();
+ }
+
+ TableInfo getTable(TableId tableId)
+ {
+ TableId bigQueryTableId = tableIds.get(tableId);
+ Table table = bigQuery.getTable(bigQueryTableId != null ? bigQueryTableId : tableId);
+ if (table != null) {
+ tableIds.putIfAbsent(tableId, table.getTableId());
+ datasetIds.putIfAbsent(toDatasetId(tableId), toDatasetId(table.getTableId()));
+ }
+ return table;
+ }
+
+ DatasetId toDatasetId(TableId tableId)
+ {
+ return DatasetId.of(tableId.getProject(), tableId.getDataset());
+ }
+
+ String getProjectId()
+ {
+ return bigQuery.getOptions().getProjectId();
+ }
+
+ Iterable listDatasets(String projectId)
+ {
+ final Iterator datasets = bigQuery.listDatasets(projectId).iterateAll().iterator();
+ return () -> Iterators.transform(datasets, this::addDataSetMappingIfNeeded);
+ }
+
+ Iterable listTables(DatasetId datasetId, TableDefinition.Type... types)
+ {
+ Set allowedTypes = ImmutableSet.copyOf(types);
+ DatasetId bigQueryDatasetId = datasetIds.getOrDefault(datasetId, datasetId);
+ Iterable allTables = bigQuery.listTables(bigQueryDatasetId).iterateAll();
+ return StreamSupport.stream(allTables.spliterator(), false)
+ .filter(table -> allowedTypes.contains(table.getDefinition().getType()))
+ .collect(toImmutableList());
+ }
+
+ private Dataset addDataSetMappingIfNeeded(Dataset dataset)
+ {
+ DatasetId bigQueryDatasetId = dataset.getDatasetId();
+ DatasetId prestoDatasetId = DatasetId.of(bigQueryDatasetId.getProject(), bigQueryDatasetId.getDataset().toLowerCase(ENGLISH));
+ datasetIds.putIfAbsent(prestoDatasetId, bigQueryDatasetId);
+ return dataset;
+ }
+
+ TableId createDestinationTable(TableId tableId)
+ {
+ String project = viewMaterializationProject.orElse(tableId.getProject());
+ String dataset = viewMaterializationDataset.orElse(tableId.getDataset());
+ DatasetId datasetId = mapIfNeeded(project, dataset);
+ UUID uuid = randomUUID();
+ String name = format("_pbc_%s", randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
+ return TableId.of(datasetId.getProject(), datasetId.getDataset(), name);
+ }
+
+ private DatasetId mapIfNeeded(String project, String dataset)
+ {
+ DatasetId datasetId = DatasetId.of(project, dataset);
+ return datasetIds.getOrDefault(datasetId, datasetId);
+ }
+
+ Table update(TableInfo table)
+ {
+ return bigQuery.update(table);
+ }
+
+ Job create(JobInfo jobInfo)
+ {
+ return bigQuery.create(jobInfo);
+ }
+
+ TableResult query(String sql)
+ {
+ try {
+ return bigQuery.query(QueryJobConfiguration.of(sql));
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e);
+ }
+ }
+
+ String createSql(TableId table, ImmutableList requiredColumns, String[] filters)
+ {
+ String columns = requiredColumns.isEmpty() ? "*" :
+ requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));
+
+ String whereClause = createWhereClause(filters)
+ .map(clause -> "WHERE " + clause)
+ .orElse("");
+
+ return createSql(table, columns, filters);
+ }
+
+ // assuming the SELECT part is properly formatted, can be used to call functions such as COUNT and SUM
+ String createSql(TableId table, String formatedQuery, String[] filters)
+ {
+ String tableName = fullTableName(table);
+
+ String whereClause = createWhereClause(filters)
+ .map(clause -> "WHERE " + clause)
+ .orElse("");
+
+ return format("SELECT %s FROM `%s` %s", formatedQuery, tableName, whereClause);
+ }
+
+ String fullTableName(TableId tableId)
+ {
+ tableId = tableIds.getOrDefault(tableId, tableId);
+ return format("%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryColumnHandle.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryColumnHandle.java
new file mode 100644
index 00000000000..4e6f4578b64
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryColumnHandle.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.bigquery.Field;
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class BigQueryColumnHandle
+ implements ColumnHandle, BigQueryType.Adaptor
+{
+ private final String name;
+ private final BigQueryType bigQueryType;
+ private final Field.Mode mode;
+ private final List subColumns;
+ private final String description;
+
+ @JsonCreator
+ public BigQueryColumnHandle(
+ @JsonProperty("name") String name,
+ @JsonProperty("bigQueryType") BigQueryType bigQueryType,
+ @JsonProperty("mode") Field.Mode mode,
+ @JsonProperty("subColumns") List subColumns,
+ @JsonProperty("description") String description)
+ {
+ this.name = requireNonNull(name, "column name cannot be null");
+ this.bigQueryType = requireNonNull(bigQueryType, () -> format("column type cannot be null for column [%s]", name));
+ this.mode = requireNonNull(mode, "Field mode cannot be null");
+ this.subColumns = ImmutableList.copyOf(requireNonNull(subColumns, "subColumns is null"));
+ this.description = description;
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ @JsonProperty
+ public BigQueryType getBigQueryType()
+ {
+ return bigQueryType;
+ }
+
+ @Override
+ public Map getBigQuerySubTypes()
+ {
+ return subColumns.stream().collect(toImmutableMap(BigQueryColumnHandle::getName, column -> column));
+ }
+
+ @Override
+ @JsonProperty
+ public Field.Mode getMode()
+ {
+ return mode;
+ }
+
+ @JsonProperty
+ public List getSubColumns()
+ {
+ return subColumns;
+ }
+
+ @JsonProperty
+ public String description()
+ {
+ return description;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return ColumnMetadata.builder()
+ .setName(name)
+ .setType(getPrestoType())
+ .setComment(Optional.ofNullable(description))
+ .setNullable(mode == Field.Mode.NULLABLE)
+ .build();
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BigQueryColumnHandle that = (BigQueryColumnHandle) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(bigQueryType, that.bigQueryType) &&
+ Objects.equals(mode, that.mode) &&
+ Objects.equals(subColumns, that.subColumns) &&
+ Objects.equals(description, that.description);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, bigQueryType, mode, subColumns, description);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("name", name)
+ .add("type", bigQueryType)
+ .add("mode", mode)
+ .add("subColumns", subColumns)
+ .add("description", description)
+ .toString();
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConfig.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConfig.java
new file mode 100644
index 00000000000..d276bfa4d80
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConfig.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.cloud.bigquery.BigQueryOptions;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+
+import javax.validation.constraints.AssertTrue;
+import javax.validation.constraints.Min;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+public class BigQueryConfig
+{
+ public static final int DEFAULT_MAX_READ_ROWS_RETRIES = 3;
+ public static final String VIEWS_ENABLED = "bigquery.views-enabled";
+
+ private Optional credentialsKey = Optional.empty();
+ private Optional credentialsFile = Optional.empty();
+ private Optional projectId = Optional.empty();
+ private Optional parentProject = Optional.empty();
+ private OptionalInt parallelism = OptionalInt.empty();
+ private boolean viewsEnabled;
+ private Optional viewMaterializationProject = Optional.empty();
+ private Optional viewMaterializationDataset = Optional.empty();
+ private int maxReadRowsRetries = DEFAULT_MAX_READ_ROWS_RETRIES;
+
+ @AssertTrue(message = "Exactly one of 'bigquery.credentials-key' or 'bigquery.credentials-file' must be specified, or the default GoogleCredentials could be created")
+ public boolean isCredentialsConfigurationValid()
+ {
+ // only one of them (at most) should be present
+ if (credentialsKey.isPresent() && credentialsFile.isPresent()) {
+ return false;
+ }
+ // if no credentials were supplied, let's check if we can create the default ones
+ if (!credentialsKey.isPresent() && !credentialsFile.isPresent()) {
+ try {
+ GoogleCredentials.getApplicationDefault();
+ }
+ catch (IOException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Optional getCredentialsKey()
+ {
+ return credentialsKey;
+ }
+
+ @Config("bigquery.credentials-key")
+ @ConfigDescription("The base64 encoded credentials key")
+ public BigQueryConfig setCredentialsKey(String credentialsKey)
+ {
+ this.credentialsKey = Optional.of(credentialsKey);
+ return this;
+ }
+
+ public Optional getCredentialsFile()
+ {
+ return credentialsFile;
+ }
+
+ @Config("bigquery.credentials-file")
+ @ConfigDescription("The path to the JSON credentials file")
+ public BigQueryConfig setCredentialsFile(String credentialsFile)
+ {
+ this.credentialsFile = Optional.of(credentialsFile);
+ return this;
+ }
+
+ public Optional getProjectId()
+ {
+ return projectId;
+ }
+
+ @Config("bigquery.project-id")
+ @ConfigDescription("The Google Cloud Project ID where the data reside")
+ public BigQueryConfig setProjectId(String projectId)
+ {
+ this.projectId = Optional.of(projectId);
+ return this;
+ }
+
+ public String getParentProject()
+ {
+ return parentProject.orElseGet(() -> BigQueryOptions.getDefaultInstance().getProjectId());
+ }
+
+ @Config("bigquery.parent-project")
+ @ConfigDescription("The project ID Google Cloud Project to bill for the export")
+ public BigQueryConfig setParentProject(String parentProject)
+ {
+ this.parentProject = Optional.of(parentProject);
+ return this;
+ }
+
+ public OptionalInt getParallelism()
+ {
+ return parallelism;
+ }
+
+ @Config("bigquery.parallelism")
+ @ConfigDescription("The number of partitions to split the data into.")
+ public BigQueryConfig setParallelism(int parallelism)
+ {
+ this.parallelism = OptionalInt.of(parallelism);
+ return this;
+ }
+
+ public boolean isViewsEnabled()
+ {
+ return viewsEnabled;
+ }
+
+ @Config(VIEWS_ENABLED)
+ @ConfigDescription("Enables the connector to read from views and not only tables")
+ public BigQueryConfig setViewsEnabled(boolean viewsEnabled)
+ {
+ this.viewsEnabled = viewsEnabled;
+ return this;
+ }
+
+ public int getViewExpirationTimeInHours()
+ {
+ return 24;
+ }
+
+ public Optional getViewMaterializationProject()
+ {
+ return viewMaterializationProject;
+ }
+
+ @Config("bigquery.view-materialization-project")
+ @ConfigDescription("The project where the materialized view is going to be created")
+ public BigQueryConfig setViewMaterializationProject(String viewMaterializationProject)
+ {
+ this.viewMaterializationProject = Optional.of(viewMaterializationProject);
+ return this;
+ }
+
+ public Optional getViewMaterializationDataset()
+ {
+ return viewMaterializationDataset;
+ }
+
+ @Config("bigquery.view-materialization-dataset")
+ @ConfigDescription("The dataset where the materialized view is going to be created")
+ public BigQueryConfig setViewMaterializationDataset(String viewMaterializationDataset)
+ {
+ this.viewMaterializationDataset = Optional.of(viewMaterializationDataset);
+ return this;
+ }
+
+ @Min(0)
+ public int getMaxReadRowsRetries()
+ {
+ return maxReadRowsRetries;
+ }
+
+ @Config("bigquery.max-read-rows-retries")
+ @ConfigDescription("The number of retries in case of retryable server issues")
+ public BigQueryConfig setMaxReadRowsRetries(int maxReadRowsRetries)
+ {
+ this.maxReadRowsRetries = maxReadRowsRetries;
+ return this;
+ }
+
+ ReadSessionCreatorConfig createReadSessionCreatorConfig()
+ {
+ return new ReadSessionCreatorConfig(
+ getParentProject(),
+ isViewsEnabled(),
+ getViewMaterializationProject(),
+ getViewMaterializationProject(),
+ getViewExpirationTimeInHours(),
+ getMaxReadRowsRetries());
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnector.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnector.java
new file mode 100644
index 00000000000..1449b023c84
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnector.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import io.airlift.log.Logger;
+import io.prestosql.spi.connector.Connector;
+import io.prestosql.spi.connector.ConnectorMetadata;
+import io.prestosql.spi.connector.ConnectorPageSourceProvider;
+import io.prestosql.spi.connector.ConnectorSplitManager;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.transaction.IsolationLevel;
+
+import javax.inject.Inject;
+
+import static io.prestosql.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static io.prestosql.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class BigQueryConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(BigQueryConnector.class);
+
+ private final BigQueryMetadata metadata;
+ private final BigQuerySplitManager splitManager;
+ private final BigQueryPageSourceProvider pageSourceProvider;
+
+ @Inject
+ public BigQueryConnector(
+ BigQueryMetadata metadata,
+ BigQuerySplitManager splitManager,
+ BigQueryPageSourceProvider pageSourceProvider)
+ {
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ log.debug("beginTransaction(isolationLevel=%s, readOnly=%s)", isolationLevel, readOnly);
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return BigQueryTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnectorFactory.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnectorFactory.java
new file mode 100644
index 00000000000..50fa0d44dca
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnectorFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.json.JsonModule;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.connector.Connector;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.connector.ConnectorFactory;
+import io.prestosql.spi.connector.ConnectorHandleResolver;
+import io.prestosql.spi.type.TypeManager;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class BigQueryConnectorFactory
+ implements ConnectorFactory
+{
+ @Override
+ public String getName()
+ {
+ return "bigquery";
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new BigQueryHandleResolver();
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ requireNonNull(catalogName, "catalogName is null");
+ requireNonNull(config, "config is null");
+
+ Bootstrap app = new Bootstrap(
+ new JsonModule(),
+ new BigQueryConnectorModule(context.getNodeManager()),
+ binder -> {
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ });
+
+ Injector injector = app.strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ return injector.getInstance(BigQueryConnector.class);
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnectorModule.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnectorModule.java
new file mode 100644
index 00000000000..817b59e3c08
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryConnectorModule.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.api.gax.rpc.FixedHeaderProvider;
+import com.google.api.gax.rpc.HeaderProvider;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import io.prestosql.spi.NodeManager;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+
+public class BigQueryConnectorModule
+ implements Module
+{
+ private final NodeManager nodeManager;
+
+ public BigQueryConnectorModule(NodeManager nodeManager)
+ {
+ this.nodeManager = nodeManager;
+ }
+
+ @Provides
+ @Singleton
+ public static HeaderProvider createHeaderProvider(NodeManager nodeManager)
+ {
+ return FixedHeaderProvider.create("user-agent", "prestosql/" + nodeManager.getCurrentNode().getVersion());
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ // BigQuery related
+ binder.bind(BigQueryStorageClientFactory.class).in(Scopes.SINGLETON);
+
+ // Connector implementation
+ binder.bind(BigQueryConnector.class).in(Scopes.SINGLETON);
+
+ binder.bind(BigQueryMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(BigQuerySplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(BigQueryPageSourceProvider.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(BigQueryConfig.class);
+ }
+
+ @Provides
+ @Singleton
+ public BigQueryCredentialsSupplier provideBigQueryCredentialsSupplier(BigQueryConfig config)
+ {
+ return new BigQueryCredentialsSupplier(config.getCredentialsKey(), config.getCredentialsFile());
+ }
+
+ @Provides
+ @Singleton
+ public BigQueryClient provideBigQueryClient(BigQueryConfig config, HeaderProvider headerProvider, BigQueryCredentialsSupplier bigQueryCredentialsSupplier)
+ {
+ BigQueryOptions.Builder options = BigQueryOptions.newBuilder()
+ .setHeaderProvider(headerProvider)
+ .setProjectId(config.getParentProject());
+ // set credentials of provided
+ bigQueryCredentialsSupplier.getCredentials().ifPresent(options::setCredentials);
+ return new BigQueryClient(options.build().getService(), config);
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryCredentialsSupplier.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryCredentialsSupplier.java
new file mode 100644
index 00000000000..a0565b6bf5d
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryCredentialsSupplier.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.api.client.util.Base64;
+import com.google.auth.Credentials;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.Streams;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+class BigQueryCredentialsSupplier
+{
+ private final Optional credentialsKey;
+ private final Optional credentialsFile;
+ private final Supplier> credentialsCreator;
+
+ public BigQueryCredentialsSupplier(Optional credentialsKey, Optional credentialsFile)
+ {
+ this.credentialsKey = credentialsKey;
+ this.credentialsFile = credentialsFile;
+ // lazy creation, cache once it's created
+ this.credentialsCreator = Suppliers.memoize(() -> {
+ Optional credentialsFromKey = credentialsKey.map(BigQueryCredentialsSupplier::createCredentialsFromKey);
+ Optional credentialsFromFile = credentialsFile.map(BigQueryCredentialsSupplier::createCredentialsFromFile);
+ return Stream.of(credentialsFromKey, credentialsFromFile)
+ .flatMap(Streams::stream)
+ .findFirst();
+ });
+ }
+
+ private static Credentials createCredentialsFromKey(String key)
+ {
+ try {
+ return GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.decodeBase64(key)));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Failed to create Credentials from key", e);
+ }
+ }
+
+ private static Credentials createCredentialsFromFile(String file)
+ {
+ try {
+ return GoogleCredentials.fromStream(new FileInputStream(file));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Failed to create Credentials from file", e);
+ }
+ }
+
+ Optional getCredentials()
+ {
+ return credentialsCreator.get();
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryEmptyProjectionPageSource.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryEmptyProjectionPageSource.java
new file mode 100644
index 00000000000..656d9fe69a9
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryEmptyProjectionPageSource.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PageBuilder;
+import io.prestosql.spi.connector.ConnectorPageSource;
+
+import java.io.IOException;
+
+public class BigQueryEmptyProjectionPageSource
+ implements ConnectorPageSource
+{
+ private final long numberOfRows;
+ private boolean finished;
+
+ public BigQueryEmptyProjectionPageSource(long numberOfRows)
+ {
+ this.numberOfRows = numberOfRows;
+ this.finished = false;
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return finished;
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ PageBuilder pageBuilder = new PageBuilder(ImmutableList.of());
+ for (long i = 0; i < numberOfRows; i++) {
+ pageBuilder.declarePosition();
+ }
+ finished = true;
+ return pageBuilder.build();
+ }
+
+ @Override
+ public long getSystemMemoryUsage()
+ {
+ return 0;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ // nothing to do
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryErrorCode.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryErrorCode.java
new file mode 100644
index 00000000000..e2661bf7ce6
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryErrorCode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import io.prestosql.spi.ErrorCode;
+import io.prestosql.spi.ErrorCodeSupplier;
+import io.prestosql.spi.ErrorType;
+
+import static io.prestosql.spi.ErrorType.EXTERNAL;
+
+public enum BigQueryErrorCode
+ implements ErrorCodeSupplier
+{
+ BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED(0, EXTERNAL),
+ BIGQUERY_DATETIME_PARSING_ERROR(1, EXTERNAL),
+ BIGQUERY_FAILED_TO_EXECUTE_QUERY(2, EXTERNAL);
+
+ private final ErrorCode errorCode;
+
+ BigQueryErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0509_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryHandleResolver.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryHandleResolver.java
new file mode 100644
index 00000000000..0ca9f36d218
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryHandleResolver.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorHandleResolver;
+import io.prestosql.spi.connector.ConnectorSplit;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+
+public class BigQueryHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return BigQueryTransactionHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return BigQueryTableHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return BigQueryColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return BigQuerySplit.class;
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryMetadata.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryMetadata.java
new file mode 100644
index 00000000000..17e6b750ac8
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryMetadata.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.bigquery.DatasetId;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Streams;
+import io.airlift.log.Logger;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ColumnMetadata;
+import io.prestosql.spi.connector.ConnectorMetadata;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.ConnectorTableMetadata;
+import io.prestosql.spi.connector.ConnectorTableProperties;
+import io.prestosql.spi.connector.LimitApplicationResult;
+import io.prestosql.spi.connector.NotFoundException;
+import io.prestosql.spi.connector.ProjectionApplicationResult;
+import io.prestosql.spi.connector.ProjectionApplicationResult.Assignment;
+import io.prestosql.spi.connector.SchemaTableName;
+import io.prestosql.spi.connector.SchemaTablePrefix;
+import io.prestosql.spi.connector.TableNotFoundException;
+import io.prestosql.spi.expression.ConnectorExpression;
+
+import javax.inject.Inject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
+import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toMap;
+
+public class BigQueryMetadata
+ implements ConnectorMetadata
+{
+ static final int NUMERIC_DATA_TYPE_PRECISION = 38;
+ static final int NUMERIC_DATA_TYPE_SCALE = 9;
+ static final String INFORMATION_SCHEMA = "information_schema";
+ private static final Logger log = Logger.get(BigQueryMetadata.class);
+ private BigQueryClient bigQueryClient;
+ private String projectId;
+
+ @Inject
+ public BigQueryMetadata(BigQueryClient bigQueryClient, BigQueryConfig config)
+ {
+ this.bigQueryClient = bigQueryClient;
+ this.projectId = config.getProjectId().orElse(bigQueryClient.getProjectId());
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ log.debug("listSchemaNames(session=%s)", session);
+ return Streams.stream(bigQueryClient.listDatasets(projectId))
+ .map(dataset -> dataset.getDatasetId().getDataset())
+ .filter(schemaName -> !schemaName.equalsIgnoreCase(INFORMATION_SCHEMA))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional schemaName)
+ {
+ log.debug("listTables(session=%s, schemaName=%s)", session, schemaName);
+ return listTablesWithTypes(session, schemaName, TABLE);
+ }
+
+ @Override
+ public List listViews(ConnectorSession session, Optional schemaName)
+ {
+ log.debug("listViews(session=%s, schemaName=%s)", session, schemaName);
+ return listTablesWithTypes(session, schemaName, VIEW);
+ }
+
+ private List listTablesWithTypes(ConnectorSession session, Optional schemaName, TableDefinition.Type... types)
+ {
+ if (schemaName.isPresent() && schemaName.get().equalsIgnoreCase(INFORMATION_SCHEMA)) {
+ return ImmutableList.of();
+ }
+ Set schemaNames = schemaName.map(ImmutableSet::of)
+ .orElseGet(() -> ImmutableSet.copyOf(listSchemaNames(session)));
+
+ ImmutableList.Builder tableNames = ImmutableList.builder();
+ for (String datasetId : schemaNames) {
+ for (Table table : bigQueryClient.listTables(DatasetId.of(projectId, datasetId), types)) {
+ tableNames.add(new SchemaTableName(datasetId, table.getTableId().getTable()));
+ }
+ }
+ return tableNames.build();
+ }
+
+ ImmutableList collectAll(Page page)
+ {
+ return ImmutableList.copyOf(page.iterateAll());
+ }
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ log.debug("getTableHandle(session=%s, tableName=%s)", session, tableName);
+ TableInfo tableInfo = getBigQueryTable(tableName);
+ if (tableInfo == null) {
+ log.debug("Table [%s.%s] was not found", tableName.getSchemaName(), tableName.getTableName());
+ return null;
+ }
+ return BigQueryTableHandle.from(tableInfo);
+ }
+
+ // May return null
+ private TableInfo getBigQueryTable(SchemaTableName tableName)
+ {
+ return bigQueryClient.getTable(TableId.of(projectId, tableName.getSchemaName(), tableName.getTableName()));
+ }
+
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ ConnectorTableHandle table = getTableHandle(session, schemaTableName);
+ if (table == null) {
+ throw new TableNotFoundException(schemaTableName);
+ }
+ return getTableMetadata(session, table);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ log.debug("getTableMetadata(session=%s, tableHandle=%s)", session, tableHandle);
+ TableInfo table = bigQueryClient.getTable(((BigQueryTableHandle) tableHandle).getTableId());
+ SchemaTableName schemaTableName = new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable());
+ Schema schema = table.getDefinition().getSchema();
+ List columns = schema == null ?
+ ImmutableList.of() :
+ schema.getFields().stream()
+ .map(Conversions::toColumnMetadata)
+ .collect(toImmutableList());
+ return new ConnectorTableMetadata(schemaTableName, columns);
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ log.debug("getColumnHandles(session=%s, tableHandle=%s)", session, tableHandle);
+ TableInfo table = bigQueryClient.getTable(((BigQueryTableHandle) tableHandle).getTableId());
+ Schema schema = table.getDefinition().getSchema();
+ return schema == null ?
+ ImmutableMap.of() :
+ schema.getFields().stream().collect(toMap(Field::getName, Conversions::toColumnHandle));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(
+ ConnectorSession session,
+ ConnectorTableHandle tableHandle,
+ ColumnHandle columnHandle)
+ {
+ log.debug("getColumnMetadata(session=%s, tableHandle=%s, columnHandle=%s)", session, columnHandle, columnHandle);
+ return ((BigQueryColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ log.debug("listTableColumns(session=%s, prefix=%s)", session, prefix);
+ requireNonNull(prefix, "prefix is null");
+ ImmutableMap.Builder> columns = ImmutableMap.builder();
+ for (SchemaTableName tableName : listTables(session, prefix)) {
+ try {
+ columns.put(tableName, getTableMetadata(session, tableName).getColumns());
+ }
+ catch (NotFoundException e) {
+ // table disappeared during listing operation
+ }
+ }
+ return columns.build();
+ }
+
+ private List listTables(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ if (!prefix.getTable().isPresent()) {
+ return listTables(session, prefix.getSchema());
+ }
+ SchemaTableName tableName = prefix.toSchemaTableName();
+ TableInfo tableInfo = getBigQueryTable(tableName);
+ return tableInfo == null ?
+ ImmutableList.of() : // table does not exist
+ ImmutableList.of(tableName);
+ }
+
+ @Override
+ public boolean usesLegacyTableLayouts()
+ {
+ return false;
+ }
+
+ @Override
+ public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle table)
+ {
+ log.debug("getTableProperties(session=%s, prefix=%s)", session, table);
+ return new ConnectorTableProperties();
+ }
+
+ @Override
+ public Optional> applyLimit(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ long limit)
+ {
+ log.debug("applyLimit(session=%s, handle=%s, limit=%s)", session, handle, limit);
+ BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle;
+
+ if (bigQueryTableHandle.getLimit().isPresent() && bigQueryTableHandle.getLimit().getAsLong() <= limit) {
+ return Optional.empty();
+ }
+
+ bigQueryTableHandle = bigQueryTableHandle.withLimit(limit);
+
+ return Optional.of(new LimitApplicationResult<>(bigQueryTableHandle, false));
+ }
+
+ @Override
+ public Optional> applyProjection(
+ ConnectorSession session,
+ ConnectorTableHandle handle,
+ List projections,
+ Map assignments)
+ {
+ log.debug("applyProjection(session=%s, handle=%s, projections=%s, assignments=%s)",
+ session, handle, projections, assignments);
+ BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) handle;
+
+ if (bigQueryTableHandle.getProjectedColumns().isPresent()) {
+ return Optional.empty();
+ }
+
+ ImmutableList.Builder projectedColumns = ImmutableList.builder();
+ ImmutableList.Builder assignmentList = ImmutableList.builder();
+ assignments.forEach((name, column) -> {
+ projectedColumns.add(column);
+ assignmentList.add(new Assignment(name, column, ((BigQueryColumnHandle) column).getPrestoType()));
+ });
+
+ bigQueryTableHandle = bigQueryTableHandle.withProjectedColumns(projectedColumns.build());
+
+ return Optional.of(new ProjectionApplicationResult<>(bigQueryTableHandle, projections, assignmentList.build()));
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryPageSourceProvider.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryPageSourceProvider.java
new file mode 100644
index 00000000000..c26bb5a77c2
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryPageSourceProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorPageSource;
+import io.prestosql.spi.connector.ConnectorPageSourceProvider;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorSplit;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+
+public class BigQueryPageSourceProvider
+ implements ConnectorPageSourceProvider
+{
+ private static final Logger log = Logger.get(BigQueryPageSourceProvider.class);
+ private final BigQueryStorageClientFactory bigQueryStorageClientFactory;
+ private final int maxReadRowsRetries;
+
+ @Inject
+ public BigQueryPageSourceProvider(BigQueryStorageClientFactory bigQueryStorageClientFactory, BigQueryConfig config)
+ {
+ this.bigQueryStorageClientFactory = bigQueryStorageClientFactory;
+ this.maxReadRowsRetries = config.getMaxReadRowsRetries();
+ }
+
+ @Override
+ public ConnectorPageSource createPageSource(
+ ConnectorTransactionHandle transaction,
+ ConnectorSession session,
+ ConnectorSplit split,
+ ConnectorTableHandle table,
+ List columns)
+ {
+ log.debug("createPageSource(transaction=%s, session=%s, split=%s, table=%s, columns=%s)", transaction, session, split, table, columns);
+ BigQuerySplit bigQuerySplit = (BigQuerySplit) split;
+ if (bigQuerySplit.representsEmptyProjection()) {
+ return new BigQueryEmptyProjectionPageSource(bigQuerySplit.getEmptyRowsToGenerate());
+ }
+
+ // not empty projection
+ BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) table;
+ ImmutableList bigQueryColumnHandles = columns.stream()
+ .map(BigQueryColumnHandle.class::cast)
+ .collect(toImmutableList());
+
+ return new BigQueryResultPageSource(bigQueryStorageClientFactory, maxReadRowsRetries, bigQuerySplit, bigQueryTableHandle, bigQueryColumnHandles);
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryPlugin.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryPlugin.java
new file mode 100644
index 00000000000..b80e5a581a8
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryPlugin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.Plugin;
+import io.prestosql.spi.connector.ConnectorFactory;
+
+public class BigQueryPlugin
+ implements Plugin
+{
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new BigQueryConnectorFactory());
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryResultPageSource.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryResultPageSource.java
new file mode 100644
index 00000000000..6e99cea0fd8
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryResultPageSource.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
+import com.google.cloud.bigquery.storage.v1beta1.Storage;
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import io.prestosql.spi.Page;
+import io.prestosql.spi.PageBuilder;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.block.Block;
+import io.prestosql.spi.block.BlockBuilder;
+import io.prestosql.spi.connector.ConnectorPageSource;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.DateTimeEncoding;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.TimeZoneKey;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.TypeSignatureParameter;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+import org.apache.avro.Conversions.DecimalConversion;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.util.Utf8;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.slice.Slices.utf8Slice;
+import static io.prestosql.plugin.bigquery.BigQueryMetadata.NUMERIC_DATA_TYPE_PRECISION;
+import static io.prestosql.plugin.bigquery.BigQueryMetadata.NUMERIC_DATA_TYPE_SCALE;
+import static io.prestosql.plugin.bigquery.BigQueryType.toPrestoTimestamp;
+import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static io.prestosql.spi.type.BigintType.BIGINT;
+import static io.prestosql.spi.type.DateType.DATE;
+import static io.prestosql.spi.type.IntegerType.INTEGER;
+import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
+import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
+import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
+import static java.lang.String.format;
+
+public class BigQueryResultPageSource
+ implements ConnectorPageSource
+{
+ static final AvroDecimalConverter DECIMAL_CONVERTER = new AvroDecimalConverter();
+ private static final Logger log = Logger.get(BigQueryResultPageSource.class);
+ private final BigQueryStorageClient bigQueryStorageClient;
+ private final int maxReadRowsRetries;
+ private final BigQuerySplit split;
+ private final BigQueryTableHandle table;
+ private final ImmutableList columns;
+ private final ImmutableList columnTypes;
+ private final AtomicLong readBytes;
+ private final PageBuilder pageBuilder;
+ private Iterator responses;
+ private boolean closed;
+
+ public BigQueryResultPageSource(
+ BigQueryStorageClientFactory bigQueryStorageClientFactory,
+ int maxReadRowsRetries,
+ BigQuerySplit split,
+ BigQueryTableHandle table,
+ ImmutableList columns)
+ {
+ this.bigQueryStorageClient = bigQueryStorageClientFactory.createBigQueryStorageClient();
+ this.maxReadRowsRetries = maxReadRowsRetries;
+ this.split = split;
+ this.table = table;
+ this.columns = columns;
+ this.readBytes = new AtomicLong();
+ this.columnTypes = columns.stream().map(BigQueryColumnHandle::getPrestoType).collect(toImmutableList());
+ this.pageBuilder = new PageBuilder(columnTypes);
+
+ log.debug("Starting to read from %s", split.getStreamName());
+ Storage.ReadRowsRequest.Builder readRowsRequest = Storage.ReadRowsRequest.newBuilder()
+ .setReadPosition(Storage.StreamPosition.newBuilder()
+ .setStream(Storage.Stream.newBuilder()
+ .setName(split.getStreamName())));
+ responses = new ReadRowsHelper(bigQueryStorageClient, readRowsRequest, maxReadRowsRetries).readRows();
+ closed = false;
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return readBytes.get();
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return !responses.hasNext();
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page");
+ Storage.ReadRowsResponse response = responses.next();
+ Iterable records = parse(response);
+ for (GenericRecord record : records) {
+ pageBuilder.declarePosition();
+ for (int column = 0; column < columnTypes.size(); column++) {
+ BlockBuilder output = pageBuilder.getBlockBuilder(column);
+ appendTo(columnTypes.get(column), record.get(column), output);
+ }
+ }
+
+ Page page = pageBuilder.build();
+ pageBuilder.reset();
+ return page;
+ }
+
+ private void appendTo(Type type, Object value, BlockBuilder output)
+ {
+ if (value == null) {
+ output.appendNull();
+ return;
+ }
+
+ Class> javaType = type.getJavaType();
+ try {
+ if (javaType == boolean.class) {
+ type.writeBoolean(output, (Boolean) value);
+ }
+ else if (javaType == long.class) {
+ if (type.equals(BIGINT)) {
+ type.writeLong(output, ((Number) value).longValue());
+ }
+ else if (type.equals(INTEGER)) {
+ type.writeLong(output, ((Number) value).intValue());
+ }
+ else if (type.equals(DATE)) {
+ type.writeLong(output, ((Number) value).intValue());
+ }
+ else if (type.equals(TIMESTAMP)) {
+ type.writeLong(output, toPrestoTimestamp(((Utf8) value).toString()));
+ }
+ else if (type.equals(TIME_WITH_TIME_ZONE)) {
+ type.writeLong(output, DateTimeEncoding.packDateTimeWithZone(((Long) value).longValue() / 1000, TimeZoneKey.UTC_KEY));
+ }
+ else if (type.equals(TIMESTAMP_WITH_TIME_ZONE)) {
+ type.writeLong(output, DateTimeEncoding.packDateTimeWithZone(((Long) value).longValue() / 1000, TimeZoneKey.UTC_KEY));
+ }
+ else {
+ throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
+ }
+ }
+ else if (javaType == double.class) {
+ type.writeDouble(output, ((Number) value).doubleValue());
+ }
+ else if (javaType == Slice.class) {
+ writeSlice(output, type, value);
+ }
+ else if (javaType == Block.class) {
+ writeBlock(output, type, value);
+ }
+ else {
+ throw new PrestoException(GENERIC_INTERNAL_ERROR, format("Unhandled type for %s: %s", javaType.getSimpleName(), type));
+ }
+ }
+ catch (ClassCastException ignore) {
+ // returns null instead of raising exception
+ output.appendNull();
+ }
+ }
+
+ private void writeSlice(BlockBuilder output, Type type, Object value)
+ {
+ if (type instanceof VarcharType) {
+ type.writeSlice(output, utf8Slice(((Utf8) value).toString()));
+ }
+ else if (type instanceof DecimalType) {
+ BigDecimal bdValue = DECIMAL_CONVERTER.convert(value);
+ type.writeSlice(output, Decimals.encodeScaledValue(bdValue, NUMERIC_DATA_TYPE_SCALE));
+ }
+ else if (type instanceof VarbinaryType) {
+ if (value instanceof ByteBuffer) {
+ type.writeSlice(output, Slices.wrappedBuffer((ByteBuffer) value));
+ }
+ else {
+ output.appendNull();
+ }
+ }
+ else {
+ throw new PrestoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Slice: " + type.getTypeSignature());
+ }
+ }
+
+ private void writeBlock(BlockBuilder output, Type type, Object value)
+ {
+ if (type instanceof ArrayType && value instanceof List>) {
+ BlockBuilder builder = output.beginBlockEntry();
+
+ for (Object element : (List>) value) {
+ appendTo(type.getTypeParameters().get(0), element, builder);
+ }
+
+ output.closeEntry();
+ return;
+ }
+ if (type instanceof RowType && value instanceof GenericRecord) {
+ GenericRecord record = (GenericRecord) value;
+ BlockBuilder builder = output.beginBlockEntry();
+
+ List fieldNames = new ArrayList<>();
+ for (int i = 0; i < type.getTypeSignature().getParameters().size(); i++) {
+ TypeSignatureParameter parameter = type.getTypeSignature().getParameters().get(i);
+ fieldNames.add(parameter.getNamedTypeSignature().getName().orElse("field" + i));
+ }
+ checkState(fieldNames.size() == type.getTypeParameters().size(), "fieldName doesn't match with type size : %s", type);
+ for (int index = 0; index < type.getTypeParameters().size(); index++) {
+ appendTo(type.getTypeParameters().get(index), record.get(fieldNames.get(index)), builder);
+ }
+ output.closeEntry();
+ return;
+ }
+ throw new PrestoException(GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
+ }
+
+ @Override
+ public long getSystemMemoryUsage()
+ {
+ return 0;
+ }
+
+ @Override
+ public void close()
+ throws IOException
+ {
+ bigQueryStorageClient.close();
+ closed = true;
+ }
+
+ Iterable parse(Storage.ReadRowsResponse response)
+ {
+ byte[] buffer = response.getAvroRows().getSerializedBinaryRows().toByteArray();
+ readBytes.addAndGet(buffer.length);
+ log.debug("Read %d bytes (total %d) from %s", buffer.length, readBytes.get(), split.getStreamName());
+ Schema avroSchema = new Schema.Parser().parse(split.getAvroSchema());
+ return () -> new AvroBinaryIterator(avroSchema, buffer);
+ }
+
+ Stream toRecords(Storage.ReadRowsResponse response)
+ {
+ byte[] buffer = response.getAvroRows().getSerializedBinaryRows().toByteArray();
+ readBytes.addAndGet(buffer.length);
+ log.debug("Read %d bytes (total %d) from %s", buffer.length, readBytes.get(), split.getStreamName());
+ Schema avroSchema = new Schema.Parser().parse(split.getAvroSchema());
+ Iterable responseRecords = () -> new AvroBinaryIterator(avroSchema, buffer);
+ return StreamSupport.stream(responseRecords.spliterator(), false);
+ }
+
+ static class AvroBinaryIterator
+ implements Iterator
+ {
+ GenericDatumReader reader;
+ BinaryDecoder in;
+
+ AvroBinaryIterator(Schema avroSchema, byte[] buffer)
+ {
+ this.reader = new GenericDatumReader<>(avroSchema);
+ this.in = new DecoderFactory().binaryDecoder(buffer, null);
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ try {
+ return !in.isEnd();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Error determining the end of Avro buffer", e);
+ }
+ }
+
+ @Override
+ public GenericRecord next()
+ {
+ try {
+ return reader.read(null, in);
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Error reading next Avro Record", e);
+ }
+ }
+ }
+
+ static class AvroDecimalConverter
+ {
+ private static final DecimalConversion AVRO_DECIMAL_CONVERSION = new DecimalConversion();
+ private static final Schema AVRO_DECIMAL_SCHEMA = new Schema.Parser().parse(format(
+ "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":%d,\"scale\":%d}",
+ NUMERIC_DATA_TYPE_PRECISION, NUMERIC_DATA_TYPE_SCALE));
+
+ BigDecimal convert(Object value)
+ {
+ return AVRO_DECIMAL_CONVERSION.fromBytes((ByteBuffer) value, AVRO_DECIMAL_SCHEMA, AVRO_DECIMAL_SCHEMA.getLogicalType());
+ }
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQuerySplit.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQuerySplit.java
new file mode 100644
index 00000000000..d34f4b2381f
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQuerySplit.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.HostAddress;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorSplit;
+
+import java.util.List;
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class BigQuerySplit
+ implements ConnectorSplit
+{
+ private static final int NO_ROWS_TO_GENERATE = -1;
+
+ private final String streamName;
+ private final String avroSchema;
+ private final List columns;
+ private final long emptyRowsToGenerate;
+
+ // do not use directly, it is public only for Jackson
+ @JsonCreator
+ public BigQuerySplit(
+ @JsonProperty("streamName") String streamName,
+ @JsonProperty("avroSchema") String avroSchema,
+ @JsonProperty("columns") List columns,
+ @JsonProperty("emptyRowsToGenerate") long emptyRowsToGenerate)
+ {
+ this.streamName = requireNonNull(streamName, "streamName cannot be null");
+ this.avroSchema = requireNonNull(avroSchema, "avroSchema cannot be null");
+ this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns cannot be null"));
+ this.emptyRowsToGenerate = emptyRowsToGenerate;
+ }
+
+ static BigQuerySplit forStream(String streamName, String avroSchema, List columns)
+ {
+ return new BigQuerySplit(streamName, avroSchema, columns, NO_ROWS_TO_GENERATE);
+ }
+
+ static BigQuerySplit emptyProjection(long numberOfRows)
+ {
+ return new BigQuerySplit("", "", ImmutableList.of(), numberOfRows);
+ }
+
+ @JsonProperty
+ public String getStreamName()
+ {
+ return streamName;
+ }
+
+ @JsonProperty
+ public String getAvroSchema()
+ {
+ return avroSchema;
+ }
+
+ @JsonProperty
+ public List getColumns()
+ {
+ return columns;
+ }
+
+ @JsonProperty
+ public long getEmptyRowsToGenerate()
+ {
+ return emptyRowsToGenerate;
+ }
+
+ @Override
+ public boolean isRemotelyAccessible()
+ {
+ return true;
+ }
+
+ @Override
+ public List getAddresses()
+ {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public Object getInfo()
+ {
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BigQuerySplit that = (BigQuerySplit) o;
+ return Objects.equals(streamName, that.streamName) &&
+ Objects.equals(avroSchema, that.avroSchema) &&
+ Objects.equals(columns, that.columns) &&
+ Objects.equals(emptyRowsToGenerate, that.emptyRowsToGenerate);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(streamName, avroSchema, columns, emptyRowsToGenerate);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("streamName", streamName)
+ .add("avroSchema", avroSchema)
+ .add("columns", columns)
+ .add("emptyRowsToGenerate", emptyRowsToGenerate)
+ .toString();
+ }
+
+ boolean representsEmptyProjection()
+ {
+ return emptyRowsToGenerate != NO_ROWS_TO_GENERATE;
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQuerySplitManager.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQuerySplitManager.java
new file mode 100644
index 00000000000..c1f173b4602
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQuerySplitManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableResult;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.prestosql.spi.NodeManager;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorSession;
+import io.prestosql.spi.connector.ConnectorSplitManager;
+import io.prestosql.spi.connector.ConnectorSplitSource;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+import io.prestosql.spi.connector.FixedSplitSource;
+
+import javax.inject.Inject;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.prestosql.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+
+public class BigQuerySplitManager
+ implements ConnectorSplitManager
+{
+ private static final Logger log = Logger.get(BigQuerySplitManager.class);
+
+ private final BigQueryClient bigQueryClient;
+ private final BigQueryStorageClientFactory bigQueryStorageClientFactory;
+ private final OptionalInt parallelism;
+ private final ReadSessionCreatorConfig readSessionCreatorConfig;
+ private final NodeManager nodeManager;
+
+ @Inject
+ public BigQuerySplitManager(
+ BigQueryConfig config,
+ BigQueryClient bigQueryClient,
+ BigQueryStorageClientFactory bigQueryStorageClientFactory,
+ NodeManager nodeManager)
+ {
+ requireNonNull(config, "config cannot be null");
+
+ this.bigQueryClient = requireNonNull(bigQueryClient, "bigQueryClient cannot be null");
+ this.bigQueryStorageClientFactory = requireNonNull(bigQueryStorageClientFactory, "bigQueryStorageClientFactory cannot be null");
+ this.parallelism = config.getParallelism();
+ this.readSessionCreatorConfig = config.createReadSessionCreatorConfig();
+ this.nodeManager = requireNonNull(nodeManager, "nodeManager cannot be null");
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(
+ ConnectorTransactionHandle transaction,
+ ConnectorSession session,
+ ConnectorTableHandle table,
+ SplitSchedulingStrategy splitSchedulingStrategy)
+ {
+ log.debug("getSplits(transaction=%s, session=%s, table=%s, splitSchedulingStrategy=%s)", transaction, session, table, splitSchedulingStrategy);
+ BigQueryTableHandle bigQueryTableHandle = (BigQueryTableHandle) table;
+
+ TableId tableId = bigQueryTableHandle.getTableId();
+ int actualParallelism = parallelism.orElse(nodeManager.getRequiredWorkerNodes().size());
+ Optional filter = Optional.empty();
+ List splits = emptyProjectionIsRequired(bigQueryTableHandle.getProjectedColumns()) ?
+ createEmptyProjection(tableId, actualParallelism, filter) :
+ readFromBigQuery(tableId, bigQueryTableHandle.getProjectedColumns(), actualParallelism, filter);
+ return new FixedSplitSource(splits);
+ }
+
+ private boolean emptyProjectionIsRequired(Optional> projectedColumns)
+ {
+ return projectedColumns.isPresent() && projectedColumns.get().isEmpty();
+ }
+
+ private ImmutableList readFromBigQuery(TableId tableId, Optional> projectedColumns, int actualParallelism, Optional filter)
+ {
+ log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, actualParallelism=%s, filter=[%s])", tableId, projectedColumns, actualParallelism, filter);
+ List columns = projectedColumns.orElse(ImmutableList.of());
+ ImmutableList projectedColumnsNames = columns.stream()
+ .map(column -> ((BigQueryColumnHandle) column).getName())
+ .collect(toImmutableList());
+
+ ReadSession readSession = new ReadSessionCreator(readSessionCreatorConfig, bigQueryClient, bigQueryStorageClientFactory)
+ .create(tableId, projectedColumnsNames, filter, actualParallelism);
+
+ return readSession.getStreamsList().stream()
+ .map(stream -> BigQuerySplit.forStream(stream.getName(), readSession.getAvroSchema().getSchema(), columns))
+ .collect(toImmutableList());
+ }
+
+ private List createEmptyProjection(TableId tableId, int actualParallelism, Optional filter)
+ {
+ log.debug("createEmptyProjection(tableId=%s, actualParallelism=%s, filter=[%s])", tableId, actualParallelism, filter);
+ try {
+ long numberOfRows;
+ if (filter.isPresent()) {
+ // count the rows based on the filter
+ String sql = bigQueryClient.createSql(tableId, "COUNT(*)", new String[] {filter.get()});
+ TableResult result = bigQueryClient.query(sql);
+ numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue();
+ }
+ else {
+ // no filters, so we can take the value from the table info
+ numberOfRows = bigQueryClient.getTable(tableId).getNumRows().longValue();
+ }
+
+ long rowsPerSplit = numberOfRows / actualParallelism;
+ long remainingRows = numberOfRows - (rowsPerSplit * actualParallelism); // need to be added to one fo the split due to integer division
+ List splits = range(0, actualParallelism)
+ .mapToObj(ignored -> BigQuerySplit.emptyProjection(rowsPerSplit))
+ .collect(toList());
+ splits.set(0, BigQuerySplit.emptyProjection(rowsPerSplit + remainingRows));
+ return splits;
+ }
+ catch (BigQueryException e) {
+ throw new PrestoException(BIGQUERY_FAILED_TO_EXECUTE_QUERY, format("Failed to compute empty projection"), e);
+ }
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryStorageClientFactory.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryStorageClientFactory.java
new file mode 100644
index 00000000000..2c5128779d0
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryStorageClientFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.HeaderProvider;
+import com.google.auth.Credentials;
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings;
+
+import javax.inject.Inject;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Optional;
+
+/**
+ * Since Guice recommends to avoid injecting closeable resources (see
+ * https://github.com/google/guice/wiki/Avoid-Injecting-Closable-Resources), this factory creates
+ * short lived clients that can be closed independently.
+ */
+public class BigQueryStorageClientFactory
+{
+ private final Optional credentials;
+ private final HeaderProvider headerProvider;
+
+ @Inject
+ public BigQueryStorageClientFactory(BigQueryCredentialsSupplier bigQueryCredentialsSupplier, HeaderProvider headerProvider)
+ {
+ this.credentials = bigQueryCredentialsSupplier.getCredentials();
+ this.headerProvider = headerProvider;
+ }
+
+ BigQueryStorageClient createBigQueryStorageClient()
+ {
+ try {
+ BigQueryStorageSettings.Builder clientSettings = BigQueryStorageSettings.newBuilder()
+ .setTransportChannelProvider(
+ BigQueryStorageSettings.defaultGrpcTransportProviderBuilder()
+ .setHeaderProvider(headerProvider)
+ .build());
+ credentials.ifPresent(credentials ->
+ clientSettings.setCredentialsProvider(FixedCredentialsProvider.create(credentials)));
+ return BigQueryStorageClient.create(clientSettings.build());
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Error creating BigQueryStorageClient", e);
+ }
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryTableHandle.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryTableHandle.java
new file mode 100644
index 00000000000..b41c2cb2706
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryTableHandle.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import io.prestosql.spi.connector.ColumnHandle;
+import io.prestosql.spi.connector.ConnectorTableHandle;
+import io.prestosql.spi.predicate.TupleDomain;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class BigQueryTableHandle
+ implements ConnectorTableHandle
+{
+ private final String projectId;
+ private final String schemaName;
+ private final String tableName;
+ private final String type;
+ private final TupleDomain constraint;
+ private final Optional> projectedColumns;
+ private final OptionalLong limit;
+
+ @JsonCreator
+ public BigQueryTableHandle(
+ @JsonProperty("projectId") String projectId,
+ @JsonProperty("schemaName") String schemaName,
+ @JsonProperty("tableName") String tableName,
+ @JsonProperty("type") String type,
+ @JsonProperty("constraint") TupleDomain constraint,
+ @JsonProperty("projectedColumns") Optional> projectedColumns,
+ @JsonProperty("limit") OptionalLong limit)
+ {
+ this.projectId = requireNonNull(projectId, "projectId is null");
+ this.schemaName = requireNonNull(schemaName, "schemaName is null");
+ this.tableName = requireNonNull(tableName, "tableName is null");
+ this.type = requireNonNull(type, "type is null");
+ this.constraint = requireNonNull(constraint, "constraint is null");
+ this.projectedColumns = requireNonNull(projectedColumns, "projectedColumns is null");
+ this.limit = requireNonNull(limit, "limit is null");
+ }
+
+ public static BigQueryTableHandle from(TableInfo tableInfo)
+ {
+ TableId tableId = tableInfo.getTableId();
+ String type = tableInfo.getDefinition().getType().toString();
+ return new BigQueryTableHandle(tableId.getProject(), tableId.getDataset(), tableId.getTable(), type, TupleDomain.none(), Optional.empty(), OptionalLong.empty());
+ }
+
+ @JsonProperty
+ public String getProjectId()
+ {
+ return projectId;
+ }
+
+ @JsonProperty
+ public String getSchemaName()
+ {
+ return schemaName;
+ }
+
+ @JsonProperty
+ public String getTableName()
+ {
+ return tableName;
+ }
+
+ @JsonProperty
+ public String getType()
+ {
+ return type;
+ }
+
+ @JsonProperty
+ public TupleDomain getConstraint()
+ {
+ return constraint;
+ }
+
+ @JsonProperty
+ public Optional> getProjectedColumns()
+ {
+ return projectedColumns;
+ }
+
+ @JsonProperty
+ public OptionalLong getLimit()
+ {
+ return limit;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BigQueryTableHandle that = (BigQueryTableHandle) o;
+ return Objects.equals(projectId, that.projectId) &&
+ Objects.equals(schemaName, that.schemaName) &&
+ Objects.equals(tableName, that.tableName) &&
+ Objects.equals(type, that.tableName) &&
+ Objects.equals(constraint, that.constraint) &&
+ Objects.equals(projectedColumns, that.projectedColumns) &&
+ Objects.equals(limit, that.limit);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(projectId, schemaName, tableName, type, constraint, projectedColumns, limit);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("projectId", projectId)
+ .add("schemaName", schemaName)
+ .add("tableName", tableName)
+ .add("type", type)
+ .add("constraint", constraint)
+ .add("projectedColumns", projectedColumns)
+ .add("limit", limit)
+ .toString();
+ }
+
+ public TableId getTableId()
+ {
+ return TableId.of(projectId, schemaName, tableName);
+ }
+
+ BigQueryTableHandle withConstraint(TupleDomain newConstraint)
+ {
+ return new BigQueryTableHandle(projectId, schemaName, tableName, type, newConstraint, projectedColumns, limit);
+ }
+
+ BigQueryTableHandle withProjectedColumns(List newProjectedColumns)
+ {
+ return new BigQueryTableHandle(projectId, schemaName, tableName, type, constraint, Optional.of(newProjectedColumns), limit);
+ }
+
+ BigQueryTableHandle withLimit(long newLimit)
+ {
+ return new BigQueryTableHandle(projectId, schemaName, tableName, type, constraint, projectedColumns, OptionalLong.of(newLimit));
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryTransactionHandle.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryTransactionHandle.java
new file mode 100644
index 00000000000..f7ec333b999
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import io.prestosql.spi.connector.ConnectorTransactionHandle;
+
+public enum BigQueryTransactionHandle
+ implements ConnectorTransactionHandle
+{
+ INSTANCE
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryType.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryType.java
new file mode 100644
index 00000000000..8b9ecb96a1a
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryType.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.Field;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.spi.type.ArrayType;
+import io.prestosql.spi.type.BigintType;
+import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.DoubleType;
+import io.prestosql.spi.type.RowType;
+import io.prestosql.spi.type.TimeWithTimeZoneType;
+import io.prestosql.spi.type.TimestampType;
+import io.prestosql.spi.type.TimestampWithTimeZoneType;
+import io.prestosql.spi.type.Type;
+import io.prestosql.spi.type.VarbinaryType;
+import io.prestosql.spi.type.VarcharType;
+
+import java.time.LocalDateTime;
+import java.time.Month;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.prestosql.plugin.bigquery.BigQueryMetadata.NUMERIC_DATA_TYPE_PRECISION;
+import static io.prestosql.plugin.bigquery.BigQueryMetadata.NUMERIC_DATA_TYPE_SCALE;
+import static io.prestosql.spi.type.VarcharType.createUnboundedVarcharType;
+import static java.lang.Integer.parseInt;
+import static java.time.Month.APRIL;
+import static java.time.Month.AUGUST;
+import static java.time.Month.DECEMBER;
+import static java.time.Month.FEBRUARY;
+import static java.time.Month.JANUARY;
+import static java.time.Month.JULY;
+import static java.time.Month.JUNE;
+import static java.time.Month.MARCH;
+import static java.time.Month.MAY;
+import static java.time.Month.NOVEMBER;
+import static java.time.Month.OCTOBER;
+import static java.time.Month.SEPTEMBER;
+import static java.time.ZoneOffset.systemDefault;
+import static java.util.stream.Collectors.toList;
+
+public enum BigQueryType
+{
+ BOOLEAN(BooleanType.BOOLEAN),
+ BYTES(VarbinaryType.VARBINARY),
+ DATE(DateType.DATE),
+ DATETIME(TimestampType.TIMESTAMP),
+ FLOAT(DoubleType.DOUBLE),
+ GEOGRAPHY(VarcharType.VARCHAR),
+ INTEGER(BigintType.BIGINT),
+ NUMERIC(DecimalType.createDecimalType(NUMERIC_DATA_TYPE_PRECISION, NUMERIC_DATA_TYPE_SCALE)),
+ RECORD(null),
+ STRING(createUnboundedVarcharType()),
+ TIME(TimeWithTimeZoneType.TIME_WITH_TIME_ZONE),
+ TIMESTAMP(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE);
+
+ private static final int[] NANO_FACTOR = {
+ -1, // 0, no need to multiply
+ 100_000_000, // 1 digit after the dot
+ 10_000_000, // 2 digits after the dot
+ 1_000_000, // 3 digits after the dot
+ 100_000, // 4 digits after the dot
+ 10_000, // 5 digits after the dot
+ 1000, // 6 digits after the dot
+ 100, // 7 digits after the dot
+ 10, // 8 digits after the dot
+ 1, // 9 digits after the dot
+ };
+ private static final ImmutableMap MONTH = ImmutableMap.builder()
+ .put("01", JANUARY)
+ .put("02", FEBRUARY)
+ .put("03", MARCH)
+ .put("04", APRIL)
+ .put("05", MAY)
+ .put("06", JUNE)
+ .put("07", JULY)
+ .put("08", AUGUST)
+ .put("09", SEPTEMBER)
+ .put("10", OCTOBER)
+ .put("11", NOVEMBER)
+ .put("12", DECEMBER)
+ .build();
+ private final Type nativeType;
+
+ BigQueryType(Type nativeType)
+ {
+ this.nativeType = nativeType;
+ }
+
+ static RowType.Field toRawTypeField(Map.Entry entry)
+ {
+ return toRawTypeField(entry.getKey(), entry.getValue());
+ }
+
+ static RowType.Field toRawTypeField(String name, BigQueryType.Adaptor typeAdaptor)
+ {
+ Type prestoType = typeAdaptor.getPrestoType();
+ return RowType.field(name, prestoType);
+ }
+
+ static LocalDateTime toLocalDateTime(String datetime)
+ {
+ int dotPosition = datetime.indexOf('.');
+ if (dotPosition == -1) {
+ // no sub-second element
+ return LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(datetime));
+ }
+ LocalDateTime result = LocalDateTime.from(DateTimeFormatter.ISO_LOCAL_DATE_TIME.parse(datetime.substring(0, dotPosition)));
+ // has sub-second element, so convert to nanosecond
+ String nanosStr = datetime.substring(dotPosition + 1);
+ int nanoOfSecond = parseInt(nanosStr) * NANO_FACTOR[nanosStr.length()];
+ return result.withNano(nanoOfSecond);
+ }
+
+ static long toPrestoTimestamp(String datetime)
+ {
+ return toLocalDateTime(datetime).atZone(systemDefault()).toInstant().toEpochMilli();
+ }
+
+ public Type getNativeType(BigQueryType.Adaptor typeAdaptor)
+ {
+ switch (this) {
+ case RECORD:
+ // create the row
+ Map subTypes = typeAdaptor.getBigQuerySubTypes();
+ checkArgument(!subTypes.isEmpty(), "a record or struct must have sub-fields");
+ List fields = subTypes.entrySet().stream().map(BigQueryType::toRawTypeField).collect(toList());
+ return RowType.from(fields);
+ default:
+ return nativeType;
+ }
+ }
+
+ interface Adaptor
+ {
+ BigQueryType getBigQueryType();
+
+ Map getBigQuerySubTypes();
+
+ Field.Mode getMode();
+
+ default Type getPrestoType()
+ {
+ Type rawType = getBigQueryType().getNativeType(this);
+ return getMode() == Field.Mode.REPEATED ? new ArrayType(rawType) : rawType;
+ }
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryUtil.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryUtil.java
new file mode 100644
index 00000000000..0bb6f4fad5c
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/BigQueryUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.BigQueryError;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.common.collect.ImmutableSet;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+
+import static com.google.cloud.http.BaseHttpServiceException.UNKNOWN_CODE;
+import static com.google.common.base.Throwables.getCausalChain;
+
+class BigQueryUtil
+{
+ static final ImmutableSet INTERNAL_ERROR_MESSAGES = ImmutableSet.of(
+ "HTTP/2 error code: INTERNAL_ERROR",
+ "Connection closed with unknown cause",
+ "Received unexpected EOS on DATA frame from server");
+
+ private BigQueryUtil() {}
+
+ static boolean isRetryable(Throwable cause)
+ {
+ return getCausalChain(cause).stream().anyMatch(BigQueryUtil::isRetryableInternalError);
+ }
+
+ static boolean isRetryableInternalError(Throwable t)
+ {
+ if (t instanceof StatusRuntimeException) {
+ StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t;
+ return statusRuntimeException.getStatus().getCode() == Status.Code.INTERNAL &&
+ INTERNAL_ERROR_MESSAGES.stream()
+ .anyMatch(message -> statusRuntimeException.getMessage().contains(message));
+ }
+ return false;
+ }
+
+ static BigQueryException convertToBigQueryException(BigQueryError error)
+ {
+ return new BigQueryException(UNKNOWN_CODE, error.getMessage(), error);
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/Conversions.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/Conversions.java
new file mode 100644
index 00000000000..141d2fd1fc0
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/Conversions.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FieldList;
+import com.google.common.collect.ImmutableMap;
+import io.prestosql.spi.connector.ColumnMetadata;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+
+class Conversions
+{
+ private Conversions() {}
+
+ static BigQueryColumnHandle toColumnHandle(Field field)
+ {
+ FieldList subFields = field.getSubFields();
+ List subColumns = subFields == null ?
+ Collections.emptyList() :
+ subFields.stream()
+ .map(Conversions::toColumnHandle)
+ .collect(Collectors.toList());
+ return new BigQueryColumnHandle(
+ field.getName(),
+ BigQueryType.valueOf(field.getType().name()),
+ field.getMode(),
+ subColumns,
+ field.getDescription());
+ }
+
+ static ColumnMetadata toColumnMetadata(Field field)
+ {
+ return ColumnMetadata.builder()
+ .setName(field.getName())
+ .setType(adapt(field).getPrestoType())
+ .setComment(Optional.ofNullable(field.getDescription()))
+ .setNullable(field.getMode() == Field.Mode.NULLABLE)
+ .build();
+ }
+
+ static BigQueryType.Adaptor adapt(final Field field)
+ {
+ return new BigQueryType.Adaptor()
+ {
+ @Override
+ public BigQueryType getBigQueryType()
+ {
+ return BigQueryType.valueOf(field.getType().name());
+ }
+
+ @Override
+ public ImmutableMap getBigQuerySubTypes()
+ {
+ FieldList subFields = field.getSubFields();
+ if (subFields == null) {
+ return ImmutableMap.of();
+ }
+ return subFields.stream().collect(toImmutableMap(Field::getName, Conversions::adapt));
+ }
+
+ @Override
+ public Field.Mode getMode()
+ {
+ return field.getMode();
+ }
+ };
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadRowsHelper.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadRowsHelper.java
new file mode 100644
index 00000000000..4322ac4a4dd
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadRowsHelper.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class ReadRowsHelper
+{
+ private BigQueryStorageClient client;
+ private ReadRowsRequest.Builder request;
+ private int maxReadRowsRetries;
+
+ public ReadRowsHelper(BigQueryStorageClient client, ReadRowsRequest.Builder request, int maxReadRowsRetries)
+ {
+ this.client = requireNonNull(client, "client cannot be null");
+ this.request = requireNonNull(request, "client cannot be null");
+ this.maxReadRowsRetries = maxReadRowsRetries;
+ }
+
+ public Iterator readRows()
+ {
+ List readRowResponses = new ArrayList<>();
+ long readRowsCount = 0;
+ int retries = 0;
+ Iterator serverResponses = fetchResponses(request);
+ while (serverResponses.hasNext()) {
+ try {
+ ReadRowsResponse response = serverResponses.next();
+ readRowsCount += response.getRowCount();
+ readRowResponses.add(response);
+ }
+ catch (RuntimeException e) {
+ // if relevant, retry the read, from the last read position
+ if (BigQueryUtil.isRetryable(e) && retries < maxReadRowsRetries) {
+ request.getReadPositionBuilder().setOffset(readRowsCount);
+ serverResponses = fetchResponses(request);
+ retries++;
+ }
+ else {
+ // to safely close the client
+ try (BigQueryStorageClient ignored = client) {
+ throw e;
+ }
+ }
+ }
+ }
+ return readRowResponses.iterator();
+ }
+
+ // In order to enable testing
+ protected Iterator fetchResponses(ReadRowsRequest.Builder readRowsRequest)
+ {
+ return client.readRowsCallable()
+ .call(readRowsRequest.build())
+ .iterator();
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadSessionCreator.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadSessionCreator.java
new file mode 100644
index 00000000000..970541f3e58
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadSessionCreator.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.BaseServiceException;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableInfo;
+import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
+import com.google.cloud.bigquery.storage.v1beta1.ReadOptions;
+import com.google.cloud.bigquery.storage.v1beta1.Storage;
+import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
+import io.airlift.log.Logger;
+import io.prestosql.spi.PrestoException;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static io.prestosql.plugin.bigquery.BigQueryErrorCode.BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED;
+import static io.prestosql.plugin.bigquery.BigQueryUtil.convertToBigQueryException;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.lang.String.format;
+
+// A helper class, also handles view materialization
+public class ReadSessionCreator
+{
+ private static final Logger log = Logger.get(ReadSessionCreator.class);
+
+ private static Cache destinationTableCache =
+ CacheBuilder.newBuilder()
+ .expireAfterWrite(15, TimeUnit.MINUTES)
+ .maximumSize(1000)
+ .build();
+
+ private final ReadSessionCreatorConfig config;
+ private final BigQueryClient bigQueryClient;
+ private final BigQueryStorageClientFactory bigQueryStorageClientFactory;
+
+ public ReadSessionCreator(
+ ReadSessionCreatorConfig config,
+ BigQueryClient bigQueryClient,
+ BigQueryStorageClientFactory bigQueryStorageClientFactory)
+ {
+ this.config = config;
+ this.bigQueryClient = bigQueryClient;
+ this.bigQueryStorageClientFactory = bigQueryStorageClientFactory;
+ }
+
+ public Storage.ReadSession create(TableId table, ImmutableList selectedFields, Optional filter, int parallelism)
+ {
+ TableInfo tableDetails = bigQueryClient.getTable(table);
+
+ TableInfo actualTable = getActualTable(tableDetails, selectedFields, new String[] {});
+
+ try (BigQueryStorageClient bigQueryStorageClient = bigQueryStorageClientFactory.createBigQueryStorageClient()) {
+ ReadOptions.TableReadOptions.Builder readOptions = ReadOptions.TableReadOptions.newBuilder()
+ .addAllSelectedFields(selectedFields);
+ filter.ifPresent(readOptions::setRowRestriction);
+
+ TableReferenceProto.TableReference tableReference = toTableReference(actualTable.getTableId());
+
+ Storage.ReadSession readSession = bigQueryStorageClient.createReadSession(
+ Storage.CreateReadSessionRequest.newBuilder()
+ .setParent("projects/" + config.parentProject)
+ .setFormat(Storage.DataFormat.AVRO)
+ .setRequestedStreams(parallelism)
+ .setReadOptions(readOptions)
+ .setTableReference(tableReference)
+ // The BALANCED sharding strategy causes the server to
+ // assign roughly the same number of rows to each stream.
+ .setShardingStrategy(Storage.ShardingStrategy.BALANCED)
+ .build());
+
+ return readSession;
+ }
+ }
+
+ TableReferenceProto.TableReference toTableReference(TableId tableId)
+ {
+ return TableReferenceProto.TableReference.newBuilder()
+ .setProjectId(tableId.getProject())
+ .setDatasetId(tableId.getDataset())
+ .setTableId(tableId.getTable())
+ .build();
+ }
+
+ TableInfo getActualTable(
+ TableInfo table,
+ ImmutableList requiredColumns,
+ String[] filters)
+ {
+ TableDefinition tableDefinition = table.getDefinition();
+ TableDefinition.Type tableType = tableDefinition.getType();
+ if (TableDefinition.Type.TABLE == tableType) {
+ return table;
+ }
+ if (TableDefinition.Type.VIEW == tableType) {
+ if (!config.viewsEnabled) {
+ throw new PrestoException(NOT_SUPPORTED, format(
+ "Views are not enabled. You can enable views by setting '%s' to true. Notice additional cost may occur.",
+ BigQueryConfig.VIEWS_ENABLED));
+ }
+ // get it from the view
+ String querySql = bigQueryClient.createSql(table.getTableId(), requiredColumns, filters);
+ log.debug("querySql is %s", querySql);
+ try {
+ return destinationTableCache.get(querySql, new DestinationTableBuilder(bigQueryClient, config, querySql, table.getTableId()));
+ }
+ catch (ExecutionException e) {
+ throw new PrestoException(BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED, "Error creating destination table", e);
+ }
+ }
+ else {
+ // not regular table or a view
+ throw new PrestoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported",
+ tableType, table.getTableId().getDataset(), table.getTableId().getTable()));
+ }
+ }
+
+ static class DestinationTableBuilder
+ implements Callable
+ {
+ final BigQueryClient bigQueryClient;
+ final ReadSessionCreatorConfig config;
+ final String querySql;
+ final TableId table;
+
+ DestinationTableBuilder(BigQueryClient bigQueryClient, ReadSessionCreatorConfig config, String querySql, TableId table)
+ {
+ this.bigQueryClient = bigQueryClient;
+ this.config = config;
+ this.querySql = querySql;
+ this.table = table;
+ }
+
+ @Override
+ public TableInfo call()
+ {
+ return createTableFromQuery();
+ }
+
+ TableInfo createTableFromQuery()
+ {
+ TableId destinationTable = bigQueryClient.createDestinationTable(table);
+ log.debug("destinationTable is %s", destinationTable);
+ JobInfo jobInfo = JobInfo.of(
+ QueryJobConfiguration
+ .newBuilder(querySql)
+ .setDestinationTable(destinationTable)
+ .build());
+ log.debug("running query %s", jobInfo);
+ Job job = waitForJob(bigQueryClient.create(jobInfo));
+ log.debug("job has finished. %s", job);
+ if (job.getStatus().getError() != null) {
+ throw convertToBigQueryException(job.getStatus().getError());
+ }
+ // add expiration time to the table
+ TableInfo createdTable = bigQueryClient.getTable(destinationTable);
+ long expirationTime = createdTable.getCreationTime() +
+ TimeUnit.HOURS.toMillis(config.viewExpirationTimeInHours);
+ Table updatedTable = bigQueryClient.update(createdTable.toBuilder()
+ .setExpirationTime(expirationTime)
+ .build());
+ return updatedTable;
+ }
+
+ Job waitForJob(Job job)
+ {
+ try {
+ return job.waitFor();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new BigQueryException(BaseServiceException.UNKNOWN_CODE, format("Job %s has been interrupted", job.getJobId()), e);
+ }
+ }
+ }
+}
diff --git a/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadSessionCreatorConfig.java b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadSessionCreatorConfig.java
new file mode 100644
index 00000000000..4e7d56bb791
--- /dev/null
+++ b/presto-bigquery/src/main/java/io/prestosql/plugin/bigquery/ReadSessionCreatorConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import java.util.Optional;
+
+class ReadSessionCreatorConfig
+{
+ final String parentProject;
+ final boolean viewsEnabled;
+ final Optional viewMaterializationProject;
+ final Optional viewMaterializationDataset;
+ final int viewExpirationTimeInHours;
+ final int maxReadRowsRetries;
+
+ ReadSessionCreatorConfig(
+ String parentProject,
+ boolean viewsEnabled,
+ Optional viewMaterializationProject,
+ Optional viewMaterializationDataset,
+ int viewExpirationTimeInHours,
+ int maxReadRowsRetries)
+ {
+ this.parentProject = parentProject;
+ this.viewsEnabled = viewsEnabled;
+ this.viewMaterializationProject = viewMaterializationProject;
+ this.viewMaterializationDataset = viewMaterializationDataset;
+ this.viewExpirationTimeInHours = viewExpirationTimeInHours;
+ this.maxReadRowsRetries = maxReadRowsRetries;
+ }
+}
diff --git a/presto-bigquery/src/test/java/io/prestosql/plugin/bigquery/BigQueryQueryRunner.java b/presto-bigquery/src/test/java/io/prestosql/plugin/bigquery/BigQueryQueryRunner.java
new file mode 100644
index 00000000000..d8efc5d3a8f
--- /dev/null
+++ b/presto-bigquery/src/test/java/io/prestosql/plugin/bigquery/BigQueryQueryRunner.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import io.airlift.log.Logger;
+import io.airlift.log.Logging;
+import io.prestosql.Session;
+import io.prestosql.plugin.tpch.TpchPlugin;
+import io.prestosql.testing.DistributedQueryRunner;
+
+import static io.airlift.testing.Closeables.closeAllSuppress;
+import static io.prestosql.testing.TestingSession.testSessionBuilder;
+
+public class BigQueryQueryRunner
+{
+ private static final String TPCH_SCHEMA = "tpch";
+
+ private BigQueryQueryRunner() {}
+
+ public static DistributedQueryRunner createQueryRunner()
+ throws Exception
+ {
+ DistributedQueryRunner queryRunner = null;
+ try {
+ queryRunner = DistributedQueryRunner.builder(createSession()).build();
+
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog("tpch", "tpch");
+
+ queryRunner.installPlugin(new BigQueryPlugin());
+ queryRunner.createCatalog("bigquery", "bigquery");
+
+ return queryRunner;
+ }
+ catch (Throwable e) {
+ closeAllSuppress(e, queryRunner);
+ throw e;
+ }
+ }
+
+ public static Session createSession()
+ {
+ return testSessionBuilder()
+ .setCatalog("bigquery")
+ .setSchema(TPCH_SCHEMA)
+ .build();
+ }
+
+ public static void main(String[] args)
+ throws Exception
+ {
+ Logging.initialize();
+ DistributedQueryRunner queryRunner = createQueryRunner();
+ Thread.sleep(10);
+ Logger log = Logger.get(BigQueryQueryRunner.class);
+ log.info("======== SERVER STARTED ========");
+ log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
+ }
+}
diff --git a/presto-bigquery/src/test/java/io/prestosql/plugin/bigquery/MockResponsesBatch.java b/presto-bigquery/src/test/java/io/prestosql/plugin/bigquery/MockResponsesBatch.java
new file mode 100644
index 00000000000..945673c147e
--- /dev/null
+++ b/presto-bigquery/src/test/java/io/prestosql/plugin/bigquery/MockResponsesBatch.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.plugin.bigquery;
+
+import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+class MockResponsesBatch
+ implements Iterator
+{
+ private Queue