Skip to content

Commit

Permalink
BigQuery testing suite that runs against BQ's day 0 region (#28397)
Browse files Browse the repository at this point in the history
* support creating dataset in specified region;

* new gradle task

* create yaml workflow

* add file loads streaming test; add option to query with location

* pass bq location to query operation
  • Loading branch information
ahmedabu98 authored Oct 3, 2023
1 parent c01b41f commit be80537
Show file tree
Hide file tree
Showing 18 changed files with 341 additions and 64 deletions.
97 changes: 97 additions & 0 deletions .github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one

Check failure on line 1 in .github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml

View workflow job for this annotation

GitHub Actions / Build failed

.github/workflows/beam_PostCommit_Java_BigQueryEarlyRollout.yml#L1

This run timed out after more than 35 days.
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PostCommit Java BigQueryEarlyRollout

on:
issue_comment:
types: [created]
schedule:
- cron: '0 */6 * * *'
workflow_dispatch:

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.body || github.event.sender.login}}'
cancel-in-progress: true

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: read
checks: write
contents: read
deployments: read
id-token: none
issues: read
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PostCommit_Java_BigQueryEarlyRollout:
name: ${{matrix.job_name}} (${{matrix.job_phrase}})
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 100
strategy:
matrix:
job_name: [beam_PostCommit_Java_BigQueryEarlyRollout]
job_phrase: [Run Java BigQueryEarlyRollout PostCommit]
if: |
github.event_name == 'workflow_dispatch' ||
github.event_name == 'schedule' ||
github.event.comment.body == 'Run Java BigQueryEarlyRollout PostCommit'
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Authenticate on GCP
uses: google-github-actions/setup-gcloud@v0
with:
service_account_email: ${{ secrets.GCP_SA_EMAIL }}
service_account_key: ${{ secrets.GCP_SA_KEY }}
project_id: ${{ secrets.GCP_PROJECT_ID }}
export_default_credentials: true
- name: run PostCommit Java BigQueryEarlyRollout script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:google-cloud-platform:bigQueryEarlyRolloutIntegrationTest
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v3
if: failure()
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Publish JUnit Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
commit: '${{ env.prsha || env.GITHUB_SHA }}'
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
files: '**/build/test-results/**/*.xml'
44 changes: 42 additions & 2 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,8 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
exclude '**/BigQueryIOReadIT.class'
exclude '**/BigQueryIOStorageQueryIT.class'
exclude '**/BigQueryIOStorageReadIT.class'
exclude '**/BigQueryIOStorageReadTableRowIT.class'
exclude '**/BigQueryIOStorageWriteIT.class'
exclude '**/BigQueryToTableIT.class'
exclude '**/BigQueryIOJsonTest.class'

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
Expand Down Expand Up @@ -244,6 +242,48 @@ task integrationTestKms(type: Test) {
}
}

/*
Integration tests for BigQueryIO that run on BigQuery's early rollout region (us-east7)
with the intended purpose of catching breaking changes from new BigQuery releases.
If these tests fail here but not in `Java_GCP_IO_Direct`, there may be a new BigQuery change
that is breaking the connector. If this is the case, we should verify with the appropriate
BigQuery infrastructure API team.
To test in a BigQuery location, we just need to create our datasets in that location.
*/
task bigQueryEarlyRolloutIntegrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempRoot = project.findProperty('gcpTempRoot') ?: 'gs://temp-storage-for-bigquery-day0-tests'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--bigQueryLocation=us-east7",
])

outputs.upToDateWhen { false }

// export and direct read
include '**/BigQueryToTableIT.class'
include '**/BigQueryIOJsonIT.class'
include '**/BigQueryIOStorageReadTableRowIT.class'
// storage write api
include '**/StorageApiDirectWriteProtosIT.class'
include '**/StorageApiSinkFailedRowsIT.class'
include '**/StorageApiSinkRowUpdateIT.class'
include '**/StorageApiSinkSchemaUpdateIT.class'
include '**/TableRowToStorageApiProtoIT.class'
// file loads
include '**/BigQuerySchemaUpdateOptionsIT.class'
include '**/BigQueryTimePartitioningClusteringIT.class'
include '**/FileLoadsStreamingIT.class'

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}

// path(s) for Cloud Spanner related classes
def spannerIncludes = [
'**/org/apache/beam/sdk/io/gcp/spanner/**',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,17 @@

/** {@link TestPipelineOptions} for {@link TestBigQuery}. */
public interface TestBigQueryOptions extends TestPipelineOptions, BigQueryOptions, GcpOptions {
String BIGQUERY_EARLY_ROLLOUT_REGION = "us-east7";

@Description("Dataset used in the integration tests. Default is integ_test")
@Default.String("integ_test")
String getTargetDataset();

void setTargetDataset(String value);

@Description("Region to perform BigQuery operations in.")
@Default.String("")
String getBigQueryLocation();

void setBigQueryLocation(String location);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.service.AutoService;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubOptions;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -36,6 +37,7 @@ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
.add(BigQueryOptions.class)
.add(PubsubOptions.class)
.add(FirestoreOptions.class)
.add(TestBigQueryOptions.class)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,21 @@ private QueryResponse getTypedTableRows(QueryResponse response) {
public List<TableRow> queryUnflattened(
String query, String projectId, boolean typed, boolean useStandardSql)
throws IOException, InterruptedException {
return queryUnflattened(query, projectId, typed, useStandardSql, null);
}

/**
* Performs a query without flattening results. May choose a location (GCP region) to perform this
* operation in.
*/
@Nonnull
public List<TableRow> queryUnflattened(
String query,
String projectId,
boolean typed,
boolean useStandardSql,
@Nullable String location)
throws IOException, InterruptedException {
Random rnd = new Random(System.currentTimeMillis());
String temporaryDatasetId =
String.format("_dataflow_temporary_dataset_%s_%s", System.nanoTime(), rnd.nextInt(1000000));
Expand All @@ -302,9 +317,11 @@ public List<TableRow> queryUnflattened(
.setDatasetId(temporaryDatasetId)
.setTableId(temporaryTableId);

createNewDataset(projectId, temporaryDatasetId);
createNewDataset(projectId, temporaryDatasetId, null, location);
createNewTable(
projectId, temporaryDatasetId, new Table().setTableReference(tempTableReference));
projectId,
temporaryDatasetId,
new Table().setTableReference(tempTableReference).setLocation(location));

JobConfigurationQuery jcQuery =
new JobConfigurationQuery()
Expand All @@ -325,6 +342,7 @@ public List<TableRow> queryUnflattened(
bqClient
.jobs()
.getQueryResults(projectId, insertedJob.getJobReference().getJobId())
.setLocation(location)
.execute();

} while (!qResponse.getJobComplete());
Expand Down Expand Up @@ -395,6 +413,18 @@ public void createNewDataset(String projectId, String datasetId)
public void createNewDataset(
String projectId, String datasetId, @Nullable Long defaultTableExpirationMs)
throws IOException, InterruptedException {
createNewDataset(projectId, datasetId, defaultTableExpirationMs, null);
}

/**
* Creates a new dataset with defaultTableExpirationMs and in a specified location (GCP region).
*/
public void createNewDataset(
String projectId,
String datasetId,
@Nullable Long defaultTableExpirationMs,
@Nullable String location)
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
IOException lastException = null;
Expand All @@ -410,7 +440,8 @@ public void createNewDataset(
projectId,
new Dataset()
.setDatasetReference(new DatasetReference().setDatasetId(datasetId))
.setDefaultTableExpirationMs(defaultTableExpirationMs))
.setDefaultTableExpirationMs(defaultTableExpirationMs)
.setLocation(location))
.execute();
if (response != null) {
LOG.info("Successfully created new dataset : " + response.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION;

import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
Expand Down Expand Up @@ -52,7 +54,13 @@ public class BigQueryIOStorageQueryIT {
"1G", 11110839L,
"1T", 11110839000L);

private static final String DATASET_ID = "big_query_storage";
private static final String DATASET_ID =
TestPipeline.testingPipelineOptions()
.as(TestBigQueryOptions.class)
.getBigQueryLocation()
.equals(BIGQUERY_EARLY_ROLLOUT_REGION)
? "big_query_storage_day0"
: "big_query_storage";
private static final String TABLE_PREFIX = "storage_read_";

private BigQueryIOStorageQueryOptions options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION;
import static org.junit.Assert.assertEquals;

import com.google.cloud.bigquery.storage.v1.DataFormat;
Expand Down Expand Up @@ -65,7 +66,13 @@ public class BigQueryIOStorageReadIT {
"1T", 11110839000L,
"multi_field", 11110839L);

private static final String DATASET_ID = "big_query_storage";
private static final String DATASET_ID =
TestPipeline.testingPipelineOptions()
.as(TestBigQueryOptions.class)
.getBigQueryLocation()
.equals(BIGQUERY_EARLY_ROLLOUT_REGION)
? "big_query_storage_day0"
: "big_query_storage";
private static final String TABLE_PREFIX = "storage_read_";

private BigQueryIOStorageReadOptions options;
Expand Down
Loading

0 comments on commit be80537

Please sign in to comment.