Skip to content

Commit

Permalink
Add temp location to batch file loads
Browse files Browse the repository at this point in the history
  • Loading branch information
Claude committed Nov 14, 2024
1 parent ab5c069 commit 6151f5c
Showing 1 changed file with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
Expand Down Expand Up @@ -98,6 +99,10 @@ public void testBatchFileLoadsWriteRead() {
String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName());
Map<String, Object> config = ImmutableMap.of("table", table);

// file loads requires a GCS temp location
String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
writePipeline.getOptions().setTempLocation(tempLocation);

// batch write
PCollectionRowTuple.of("input", getInput(writePipeline, false))
.apply(Managed.write(Managed.BIGQUERY).withConfig(config));
Expand Down Expand Up @@ -146,6 +151,12 @@ public void testDynamicDestinations(boolean streaming) throws IOException, Inter
Map<String, Object> config =
ImmutableMap.of("table", destinationTemplate, "drop", Collections.singletonList("dest"));

if (!streaming) {
// file loads requires a GCS temp location
String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot();
writePipeline.getOptions().setTempLocation(tempLocation);
}

// write
PCollectionRowTuple.of("input", getInput(writePipeline, streaming))
.apply(Managed.write(Managed.BIGQUERY).withConfig(config));
Expand Down

0 comments on commit 6151f5c

Please sign in to comment.