From 2f788f674f377f857748c761abe98397038a8979 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Wed, 4 Dec 2024 11:46:47 -0500 Subject: [PATCH] use standard sql (#33278) --- .../BigQueryDirectReadSchemaTransformProvider.java | 2 +- .../io/gcp/bigquery/providers/BigQueryManagedIT.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java index 15b1b01d7f6c..073de40038b3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryDirectReadSchemaTransformProvider.java @@ -233,7 +233,7 @@ BigQueryIO.TypedRead createDirectReadTransform() { read = read.withSelectedFields(configuration.getSelectedFields()); } } else { - read = read.fromQuery(configuration.getQuery()); + read = read.fromQuery(configuration.getQuery()).usingStandardSql(); } if (!Strings.isNullOrEmpty(configuration.getKmsKey())) { read = read.withKmsKey(configuration.getKmsKey()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index 16ce2f049dcf..6a422f1832d8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -96,8 +96,8 @@ public static void cleanup() { @Test public void testBatchFileLoadsWriteRead() { String table = - String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); - Map config = ImmutableMap.of("table", table); + String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); + Map writeConfig = ImmutableMap.of("table", table); // file loads requires a GCS temp location String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); @@ -105,13 +105,15 @@ public void testBatchFileLoadsWriteRead() { // batch write PCollectionRowTuple.of("input", getInput(writePipeline, false)) - .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); + .apply(Managed.write(Managed.BIGQUERY).withConfig(writeConfig)); writePipeline.run().waitUntilFinish(); + Map readConfig = + ImmutableMap.of("query", String.format("SELECT * FROM `%s`", table)); // read and validate PCollection outputRows = readPipeline - .apply(Managed.read(Managed.BIGQUERY).withConfig(config)) + .apply(Managed.read(Managed.BIGQUERY).withConfig(readConfig)) .getSinglePCollection(); PAssert.that(outputRows).containsInAnyOrder(ROWS); readPipeline.run().waitUntilFinish();