Skip to content

Commit

Permalink
Merge pull request apache#17520 from BEAM-12356 Close DatasetService …
Browse files Browse the repository at this point in the history
…leaked with getTable
  • Loading branch information
baeminbo authored May 16, 2022
1 parent e27f565 commit e84bd61
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -173,7 +174,9 @@ protected Table getTargetTable(BigQueryOptions options) throws Exception {
location,
queryTempDataset,
kmsKey);
return bqServices.getDatasetService(options).getTable(queryResultTable);
try (DatasetService datasetService = bqServices.getDatasetService(options)) {
return datasetService.getTable(queryResultTable);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -183,8 +184,10 @@ protected String getTargetTableId(BigQueryOptions options) throws Exception {
? options.getProject()
: options.getBigQueryProject());
}
Table table = bqServices.getDatasetService(options).getTable(tableReference);
cachedTable.compareAndSet(null, table);
try (DatasetService datasetService = bqServices.getDatasetService(options)) {
Table table = datasetService.getTable(tableReference);
cachedTable.compareAndSet(null, table);
}
}

return cachedTable.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
Expand Down Expand Up @@ -70,14 +71,16 @@ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws E
if (tableSizeBytes.get() == null) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableRef = tableDef.getTableReference(bqOptions);
Table table = bqServices.getDatasetService(bqOptions).getTable(tableRef);
Long numBytes = table.getNumBytes();
if (table.getStreamingBuffer() != null
&& table.getStreamingBuffer().getEstimatedBytes() != null) {
numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
}
try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
Table table = datasetService.getTable(tableRef);
Long numBytes = table.getNumBytes();
if (table.getStreamingBuffer() != null
&& table.getStreamingBuffer().getEstimatedBytes() != null) {
numBytes += table.getStreamingBuffer().getEstimatedBytes().longValue();
}

tableSizeBytes.compareAndSet(null, numBytes);
tableSizeBytes.compareAndSet(null, numBytes);
}
}
return tableSizeBytes.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand Down Expand Up @@ -106,11 +107,12 @@ public <T> BigQuerySourceBase<T> toSource(
@Override
public Schema getBeamSchema(BigQueryOptions bqOptions) {
try {
TableReference tableRef = getTableReference(bqOptions);
TableSchema tableSchema =
bqServices.getDatasetService(bqOptions).getTable(tableRef).getSchema();
return BigQueryUtils.fromTableSchema(tableSchema);
} catch (IOException | InterruptedException | NullPointerException e) {
try (DatasetService datasetService = bqServices.getDatasetService(bqOptions)) {
TableReference tableRef = getTableReference(bqOptions);
TableSchema tableSchema = datasetService.getTable(tableRef).getSchema();
return BigQueryUtils.fromTableSchema(tableSchema);
}
} catch (Exception e) {
throw new BigQuerySchemaRetrievalException("Exception while trying to retrieve schema", e);
}
}
Expand Down

0 comments on commit e84bd61

Please sign in to comment.