diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 32ee29738bf8..23acd8e01f7f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -419,16 +419,15 @@ private WriteResult expandTriggered(PCollection> inpu .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameTriggered", - ParDo.of( - new WriteRename( - bigQueryServices, - copyJobIdPrefixView, - writeDisposition, - createDisposition, - maxRetryJobs, - kmsKey, - loadJobProjectId)) - .withSideInputs(copyJobIdPrefixView)); + new WriteRename( + bigQueryServices, + copyJobIdPrefixView, + writeDisposition, + createDisposition, + maxRetryJobs, + kmsKey, + loadJobProjectId, + copyJobIdPrefixView)); PCollection successfulSinglePartitionWrites = writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView) @@ -517,16 +516,15 @@ public WriteResult expandUntriggered(PCollection> inp .withSideInputs(sideInputsForUpdateSchema)) .apply( "WriteRenameUntriggered", - ParDo.of( - new WriteRename( - bigQueryServices, - copyJobIdPrefixView, - writeDisposition, - createDisposition, - maxRetryJobs, - kmsKey, - loadJobProjectId)) - .withSideInputs(copyJobIdPrefixView)) + new WriteRename( + bigQueryServices, + copyJobIdPrefixView, + writeDisposition, + createDisposition, + maxRetryJobs, + kmsKey, + loadJobProjectId, + copyJobIdPrefixView)) .setCoder(tableDestinationCoder); PCollectionList allSuccessfulWrites = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java index f4120a013ed6..c4ad09ce6eab 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java @@ -32,6 +32,7 @@ import com.google.api.services.bigquery.model.TimePartitioning; import com.google.cloud.hadoop.util.ApiErrorExtractor; import java.io.IOException; +import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; @@ -168,7 +169,7 @@ private static boolean nextBackOff(Sleeper sleeper, BackOff backOff) } } - static class PendingJob { + static class PendingJob implements Serializable { private final SerializableFunction executeJob; private final SerializableFunction pollJob; private final SerializableFunction lookupJob; @@ -275,7 +276,7 @@ boolean shouldRetry() { } } - static class RetryJobId { + static class RetryJobId implements Serializable { private final String jobIdPrefix; private final int retryIndex; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java index 9d798b397070..a7177613c60d 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java @@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.TableReference; import java.io.IOException; +import java.io.Serializable; import java.util.Collection; import java.util.List; import java.util.Map; @@ -32,13 +33,19 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService; +import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ArrayListMultimap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -52,8 +59,9 @@ * provides the list of all temporary tables created for a given {@link TableDestination}. */ class WriteRename - extends DoFn>, TableDestination> { - private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class); + extends PTransform< + PCollection>>, + PCollection> { private final BigQueryServices bqServices; private final PCollectionView jobIdToken; @@ -66,27 +74,7 @@ class WriteRename private final int maxRetryJobs; private final @Nullable String kmsKey; private final @Nullable ValueProvider loadJobProjectId; - private transient @Nullable DatasetService datasetService; - - private static class PendingJobData { - final BigQueryHelpers.PendingJob retryJob; - final TableDestination tableDestination; - final List tempTables; - final BoundedWindow window; - - public PendingJobData( - BigQueryHelpers.PendingJob retryJob, - TableDestination tableDestination, - List tempTables, - BoundedWindow window) { - this.retryJob = retryJob; - this.tableDestination = tableDestination; - this.tempTables = tempTables; - this.window = window; - } - } - // All pending copy jobs. - private List pendingJobs = Lists.newArrayList(); + private final PCollectionView copyJobIdPrefixView; public WriteRename( BigQueryServices bqServices, @@ -95,7 +83,8 @@ public WriteRename( CreateDisposition createDisposition, int maxRetryJobs, @Nullable String kmsKey, - @Nullable ValueProvider loadJobProjectId) { + @Nullable ValueProvider loadJobProjectId, + PCollectionView copyJobIdPrefixView) { this.bqServices = bqServices; this.jobIdToken = jobIdToken; this.firstPaneWriteDisposition = writeDisposition; @@ -103,229 +92,368 @@ public WriteRename( this.maxRetryJobs = maxRetryJobs; this.kmsKey = kmsKey; this.loadJobProjectId = loadJobProjectId; + this.copyJobIdPrefixView = copyJobIdPrefixView; } - @StartBundle - public void startBundle(StartBundleContext c) { - pendingJobs.clear(); + @Override + public PCollection expand( + PCollection>> input) { + return input + .apply( + "WriteRename", + ParDo.of( + new WriteRenameFn( + bqServices, + jobIdToken, + firstPaneWriteDisposition, + firstPaneCreateDisposition, + maxRetryJobs, + kmsKey, + loadJobProjectId)) + .withSideInputs(copyJobIdPrefixView)) + // We apply a fusion break here to ensure that on retries, the temp table renaming won't + // attempt to rename a temp table that was previously deleted in TempTableCleanupFn + .apply(Reshuffle.viaRandomKey()) + .apply("RemoveTempTables", ParDo.of(new TempTableCleanupFn(bqServices))) + .setCoder(TableDestinationCoder.of()); } - @Teardown - public void onTeardown() { - try { - if (datasetService != null) { - datasetService.close(); - datasetService = null; - } - } catch (Exception e) { - throw new RuntimeException(e); + public static class PendingJobData implements Serializable { + + final BigQueryHelpers.PendingJob retryJob; + final TableDestination tableDestination; + final List tempTables; + final BoundedWindow window; + + public PendingJobData( + BigQueryHelpers.PendingJob retryJob, + TableDestination tableDestination, + List tempTables, + BoundedWindow window) { + this.retryJob = retryJob; + this.tableDestination = tableDestination; + this.tempTables = tempTables; + this.window = window; } } - @ProcessElement - public void processElement( - @Element Iterable> element, - ProcessContext c, - BoundedWindow window) - throws Exception { - Multimap tempTables = ArrayListMultimap.create(); - for (KV entry : element) { - tempTables.put(entry.getKey(), entry.getValue()); + public static class WriteRenameFn + extends DoFn< + Iterable>, KV>> { + private static final Logger LOG = LoggerFactory.getLogger(WriteRenameFn.class); + + private final BigQueryServices bqServices; + private final PCollectionView jobIdToken; + + // In the triggered scenario, the user-supplied create and write dispositions only apply to the + // first trigger pane, as that's when when the table is created. Subsequent loads should always + // append to the table, and so use CREATE_NEVER and WRITE_APPEND dispositions respectively. + private final WriteDisposition firstPaneWriteDisposition; + private final CreateDisposition firstPaneCreateDisposition; + private final int maxRetryJobs; + private final @Nullable String kmsKey; + private final @Nullable ValueProvider loadJobProjectId; + private transient @Nullable DatasetService datasetService; + + // All pending copy jobs. + private List pendingJobs = Lists.newArrayList(); + + public WriteRenameFn( + BigQueryServices bqServices, + PCollectionView jobIdToken, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + int maxRetryJobs, + @Nullable String kmsKey, + @Nullable ValueProvider loadJobProjectId) { + this.bqServices = bqServices; + this.jobIdToken = jobIdToken; + this.firstPaneWriteDisposition = writeDisposition; + this.firstPaneCreateDisposition = createDisposition; + this.maxRetryJobs = maxRetryJobs; + this.kmsKey = kmsKey; + this.loadJobProjectId = loadJobProjectId; + } + + @StartBundle + public void startBundle(StartBundleContext c) { + pendingJobs.clear(); } - for (Map.Entry> entry : - tempTables.asMap().entrySet()) { - // Process each destination table. - // Do not copy if no temp tables are provided. - if (!entry.getValue().isEmpty()) { - pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window)); + + @Teardown + public void onTeardown() { + try { + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @ProcessElement + public void processElement( + @Element Iterable> element, + ProcessContext c, + BoundedWindow window) + throws Exception { + Multimap tempTables = ArrayListMultimap.create(); + for (KV entry : element) { + tempTables.put(entry.getKey(), entry.getValue()); + } + for (Map.Entry> entry : + tempTables.asMap().entrySet()) { + // Process each destination table. + // Do not copy if no temp tables are provided. + if (!entry.getValue().isEmpty()) { + pendingJobs.add(startWriteRename(entry.getKey(), entry.getValue(), c, window)); + } } } - } - @FinishBundle - public void finishBundle(FinishBundleContext c) throws Exception { - DatasetService datasetService = - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); - PendingJobManager jobManager = new PendingJobManager(); - for (PendingJobData pendingJob : pendingJobs) { - jobManager.addPendingJob( - pendingJob.retryJob, - j -> { - try { - if (pendingJob.tableDestination.getTableDescription() != null) { - TableReference ref = pendingJob.tableDestination.getTableReference(); - datasetService.patchTableDescription( - ref.clone() - .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), - pendingJob.tableDestination.getTableDescription()); + @FinishBundle + public void finishBundle(FinishBundleContext c) throws Exception { + DatasetService datasetService = + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)); + PendingJobManager jobManager = new PendingJobManager(); + for (PendingJobData pendingJob : pendingJobs) { + jobManager.addPendingJob( + pendingJob.retryJob, + j -> { + try { + if (pendingJob.tableDestination.getTableDescription() != null) { + TableReference ref = pendingJob.tableDestination.getTableReference(); + datasetService.patchTableDescription( + ref.clone() + .setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())), + pendingJob.tableDestination.getTableDescription()); + } + c.output( + KV.of(pendingJob.tableDestination, pendingJob.tempTables), + pendingJob.window.maxTimestamp(), + pendingJob.window); + return null; + } catch (IOException | InterruptedException e) { + return e; } - c.output( - pendingJob.tableDestination, pendingJob.window.maxTimestamp(), pendingJob.window); - removeTemporaryTables(datasetService, pendingJob.tempTables); - return null; - } catch (IOException | InterruptedException e) { - return e; - } - }); + }); + } + jobManager.waitForDone(); } - jobManager.waitForDone(); - } - private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { - if (datasetService == null) { - datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; } - return datasetService; - } - private PendingJobData startWriteRename( - TableDestination finalTableDestination, - Iterable tempTableNames, - ProcessContext c, - BoundedWindow window) - throws Exception { - // The pane may have advanced either here due to triggering or due to an upstream trigger. We - // check the upstream - // trigger to handle the case where an earlier pane triggered the single-partition path. If this - // happened, then the - // table will already exist so we want to append to the table. - WriteTables.@Nullable Result firstTempTable = Iterables.getFirst(tempTableNames, null); - boolean isFirstPane = - firstTempTable != null && firstTempTable.isFirstPane() && c.pane().isFirst(); - WriteDisposition writeDisposition = - isFirstPane ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; - CreateDisposition createDisposition = - isFirstPane ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; - List tempTables = - StreamSupport.stream(tempTableNames.spliterator(), false) - .map( - result -> - BigQueryHelpers.fromJsonString(result.getTableName(), TableReference.class)) - .collect(Collectors.toList()); - - // Make sure each destination table gets a unique job id. - String jobIdPrefix = - BigQueryResourceNaming.createJobIdWithDestination( - c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); - - BigQueryHelpers.PendingJob retryJob = - startCopy( - bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), - getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), - jobIdPrefix, - finalTableDestination.getTableReference(), - tempTables, - writeDisposition, - createDisposition, - kmsKey, - loadJobProjectId); - return new PendingJobData(retryJob, finalTableDestination, tempTables, window); - } + private PendingJobData startWriteRename( + TableDestination finalTableDestination, + Iterable tempTableNames, + ProcessContext c, + BoundedWindow window) + throws Exception { + // The pane may have advanced either here due to triggering or due to an upstream trigger. We + // check the upstream + // trigger to handle the case where an earlier pane triggered the single-partition path. If + // this + // happened, then the + // table will already exist so we want to append to the table. + WriteTables.@Nullable Result firstTempTable = Iterables.getFirst(tempTableNames, null); + boolean isFirstPane = + firstTempTable != null && firstTempTable.isFirstPane() && c.pane().isFirst(); + WriteDisposition writeDisposition = + isFirstPane ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND; + CreateDisposition createDisposition = + isFirstPane ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER; + List tempTables = + StreamSupport.stream(tempTableNames.spliterator(), false) + .map( + result -> + BigQueryHelpers.fromJsonString(result.getTableName(), TableReference.class)) + .collect(Collectors.toList()); - private BigQueryHelpers.PendingJob startCopy( - JobService jobService, - DatasetService datasetService, - String jobIdPrefix, - TableReference ref, - List tempTables, - WriteDisposition writeDisposition, - CreateDisposition createDisposition, - @Nullable String kmsKey, - @Nullable ValueProvider loadJobProjectId) { - JobConfigurationTableCopy copyConfig = - new JobConfigurationTableCopy() - .setSourceTables(tempTables) - .setDestinationTable(ref) - .setWriteDisposition(writeDisposition.name()) - .setCreateDisposition(createDisposition.name()); - if (kmsKey != null) { - copyConfig.setDestinationEncryptionConfiguration( - new EncryptionConfiguration().setKmsKeyName(kmsKey)); + // We maintain string versions of the temp tables in order to make pendingJobData serializable + List tempTableStrings = + StreamSupport.stream(tempTableNames.spliterator(), false) + .map(Result::getTableName) + .collect(Collectors.toList()); + ; + + // Make sure each destination table gets a unique job id. + String jobIdPrefix = + BigQueryResourceNaming.createJobIdWithDestination( + c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); + + BigQueryHelpers.PendingJob retryJob = + startCopy( + bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), + getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)), + jobIdPrefix, + finalTableDestination.getTableReference(), + tempTables, + writeDisposition, + createDisposition, + kmsKey, + loadJobProjectId); + return new PendingJobData(retryJob, finalTableDestination, tempTableStrings, window); } - String bqLocation = - BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId()); - - String projectId = - loadJobProjectId == null || loadJobProjectId.get() == null - ? ref.getProjectId() - : loadJobProjectId.get(); - BigQueryHelpers.PendingJob retryJob = - new BigQueryHelpers.PendingJob( - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - LOG.info( - "Starting copy job for table {} using {}, job id iteration {}", - ref, - jobRef, - jobId.getRetryIndex()); - try { - jobService.startCopyJob(jobRef, copyConfig); - } catch (IOException | InterruptedException e) { - LOG.warn("Copy job {} failed.", jobRef, e); - throw new RuntimeException(e); - } - return null; - }, - // Function to poll the result of a load job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, - // Function to lookup a job. - jobId -> { - JobReference jobRef = - new JobReference() - .setProjectId(projectId) - .setJobId(jobId.getJobId()) - .setLocation(bqLocation); - try { - return jobService.getJob(jobRef); - } catch (InterruptedException | IOException e) { - throw new RuntimeException(e); - } - }, - maxRetryJobs, - jobIdPrefix); - return retryJob; + private BigQueryHelpers.PendingJob startCopy( + JobService jobService, + DatasetService datasetService, + String jobIdPrefix, + TableReference ref, + List tempTables, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + @Nullable String kmsKey, + @Nullable ValueProvider loadJobProjectId) { + JobConfigurationTableCopy copyConfig = + new JobConfigurationTableCopy() + .setSourceTables(tempTables) + .setDestinationTable(ref) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()); + if (kmsKey != null) { + copyConfig.setDestinationEncryptionConfiguration( + new EncryptionConfiguration().setKmsKeyName(kmsKey)); + } + + String bqLocation = + BigQueryHelpers.getDatasetLocation( + datasetService, ref.getProjectId(), ref.getDatasetId()); + + String projectId = + loadJobProjectId == null || loadJobProjectId.get() == null + ? ref.getProjectId() + : loadJobProjectId.get(); + BigQueryHelpers.PendingJob retryJob = + new BigQueryHelpers.PendingJob( + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + LOG.info( + "Starting copy job for table {} using {}, job id iteration {}", + ref, + jobRef, + jobId.getRetryIndex()); + try { + jobService.startCopyJob(jobRef, copyConfig); + } catch (IOException | InterruptedException e) { + LOG.warn("Copy job {} failed.", jobRef, e); + throw new RuntimeException(e); + } + return null; + }, + // Function to poll the result of a load job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + // Function to lookup a job. + jobId -> { + JobReference jobRef = + new JobReference() + .setProjectId(projectId) + .setJobId(jobId.getJobId()) + .setLocation(bqLocation); + try { + return jobService.getJob(jobRef); + } catch (InterruptedException | IOException e) { + throw new RuntimeException(e); + } + }, + maxRetryJobs, + jobIdPrefix); + return retryJob; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + + builder + .add( + DisplayData.item("firstPaneWriteDisposition", firstPaneWriteDisposition.toString()) + .withLabel("Write Disposition")) + .add( + DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString()) + .withLabel("Create Disposition")) + .add( + DisplayData.item("launchesBigQueryJobs", true) + .withLabel("This transform launches BigQuery jobs to read/write elements.")); + } } - static void removeTemporaryTables(DatasetService tableService, List tempTables) { - for (TableReference tableRef : tempTables) { + public static class TempTableCleanupFn + extends DoFn>, TableDestination> { + + private static final Logger LOG = LoggerFactory.getLogger(TempTableCleanupFn.class); + + private final BigQueryServices bqServices; + private transient @Nullable DatasetService datasetService; + + public TempTableCleanupFn(BigQueryServices bqServices) { + this.bqServices = bqServices; + } + + @ProcessElement + public void processElement( + PipelineOptions pipelineOptions, + @Element KV> tempTable, + OutputReceiver destinationOutputReceiver) { + List tableReferences = + tempTable.getValue().stream() + .map(tableName -> BigQueryHelpers.fromJsonString(tableName, TableReference.class)) + .collect(Collectors.toList()); + removeTemporaryTables(getDatasetService(pipelineOptions), tableReferences); + destinationOutputReceiver.output(tempTable.getKey()); + } + + private DatasetService getDatasetService(PipelineOptions pipelineOptions) { + if (datasetService == null) { + datasetService = bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); + } + return datasetService; + } + + @VisibleForTesting + public static void removeTemporaryTables( + DatasetService datasetService, List tempTables) { + for (TableReference tableRef : tempTables) { + try { + LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef)); + datasetService.deleteTable(tableRef); + } catch (Exception e) { + LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e); + } + } + } + + @Teardown + public void onTeardown() { try { - LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef)); - tableService.deleteTable(tableRef); + if (datasetService != null) { + datasetService.close(); + datasetService = null; + } } catch (Exception e) { - LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e); + throw new RuntimeException(e); } } } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - - builder - .add( - DisplayData.item("firstPaneWriteDisposition", firstPaneWriteDisposition.toString()) - .withLabel("Write Disposition")) - .add( - DisplayData.item("firstPaneCreateDisposition", firstPaneCreateDisposition.toString()) - .withLabel("Create Disposition")) - .add( - DisplayData.item("launchesBigQueryJobs", true) - .withLabel("This transform launches BigQuery jobs to read/write elements.")); - } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 55269342155f..89cbc2cd24b8 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; +import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -92,6 +93,7 @@ import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; @@ -105,6 +107,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption; import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder; +import org.apache.beam.sdk.io.gcp.bigquery.WriteRename.TempTableCleanupFn; import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; @@ -256,7 +259,9 @@ public void evaluate() throws Throwable { }; @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(WriteRename.class); + + @Rule + public transient ExpectedLogs loggedWriteRename = ExpectedLogs.none(TempTableCleanupFn.class); private FakeDatasetService fakeDatasetService = new FakeDatasetService(); private FakeJobService fakeJobService = new FakeJobService(); @@ -2529,7 +2534,7 @@ public void testWriteTables() throws Exception { PCollection> writeTablesOutput = writeTablesInput .apply(writeTables) - .setCoder(KvCoder.of(StringUtf8Coder.of(), WriteTables.ResultCoder.INSTANCE)) + .setCoder(KvCoder.of(StringUtf8Coder.of(), INSTANCE)) .apply( ParDo.of( new DoFn< @@ -2629,13 +2634,19 @@ public void testWriteRename() throws Exception { BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED, 3, "kms_key", - null); + null, + jobIdTokenView); - DoFnTester>, TableDestination> tester = - DoFnTester.of(writeRename); - tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken); - tester.processElement(tempTablesElement); - tester.finishBundle(); + // Unfortunate hack to have create treat tempTablesElement as a single element, instead of as an + // iterable + p.apply( + Create.of( + ImmutableList.of( + (Iterable>) tempTablesElement)) + .withCoder(IterableCoder.of(KvCoder.of(TableDestinationCoder.of(), INSTANCE)))) + .apply(writeRename); + + p.run().waitUntilFinish(); for (Map.Entry> entry : tempTables.asMap().entrySet()) { TableDestination tableDestination = entry.getKey(); @@ -2683,7 +2694,7 @@ public void testRemoveTemporaryTables() throws Exception { tableRefs.add( BigQueryHelpers.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, "table4"))); - WriteRename.removeTemporaryTables(datasetService, tableRefs); + WriteRename.TempTableCleanupFn.removeTemporaryTables(datasetService, tableRefs); for (TableReference ref : tableRefs) { loggedWriteRename.verifyDebug("Deleting table " + toJsonString(ref));