Skip to content

Commit

Permalink
use standard sql (#33278)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 authored Dec 4, 2024
1 parent d6cad34 commit 2f788f6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ BigQueryIO.TypedRead<TableRow> createDirectReadTransform() {
read = read.withSelectedFields(configuration.getSelectedFields());
}
} else {
read = read.fromQuery(configuration.getQuery());
read = read.fromQuery(configuration.getQuery()).usingStandardSql();
}
if (!Strings.isNullOrEmpty(configuration.getKmsKey())) {
read = read.withKmsKey(configuration.getKmsKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,24 @@ public static void cleanup() {
@Test
public void testBatchFileLoadsWriteRead() {
String table =
String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());
Map<String, Object> config = ImmutableMap.of("table", table);
String.format("%s.%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());
Map<String, Object> writeConfig = ImmutableMap.of("table", table);

// file loads requires a GCS temp location
String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
writePipeline.getOptions().setTempLocation(tempLocation);

// batch write
PCollectionRowTuple.of("input", getInput(writePipeline, false))
.apply(Managed.write(Managed.BIGQUERY).withConfig(config));
.apply(Managed.write(Managed.BIGQUERY).withConfig(writeConfig));
writePipeline.run().waitUntilFinish();

Map<String, Object> readConfig =
ImmutableMap.of("query", String.format("SELECT * FROM `%s`", table));
// read and validate
PCollection<Row> outputRows =
readPipeline
.apply(Managed.read(Managed.BIGQUERY).withConfig(config))
.apply(Managed.read(Managed.BIGQUERY).withConfig(readConfig))
.getSinglePCollection();
PAssert.that(outputRows).containsInAnyOrder(ROWS);
readPipeline.run().waitUntilFinish();
Expand Down

0 comments on commit 2f788f6

Please sign in to comment.