Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support creating BigLake managed tables #33125

Merged
merged 5 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -629,6 +630,9 @@ public class BigQueryIO {
private static final SerializableFunction<TableSchema, org.apache.avro.Schema>
DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema;

static final String CONNECTION_ID = "connectionId";
static final String STORAGE_URI = "storageUri";

/**
* @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link
* #readTableRows()} does exactly the same as {@link #read}, however {@link
Expand Down Expand Up @@ -2372,6 +2376,8 @@ public enum Method {
/** Table description. Default is empty. */
abstract @Nullable String getTableDescription();

abstract @Nullable Map<String, String> getBigLakeConfiguration();

/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();

Expand Down Expand Up @@ -2484,6 +2490,8 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setTableDescription(String tableDescription);

abstract Builder<T> setBigLakeConfiguration(Map<String, String> bigLakeConfiguration);

abstract Builder<T> setValidate(boolean validate);

abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
Expand Down Expand Up @@ -2909,6 +2917,30 @@ public Write<T> withTableDescription(String tableDescription) {
return toBuilder().setTableDescription(tableDescription).build();
}

/**
* Specifies a configuration to create BigLake tables. The following options are available:
*
* <ul>
* <li>connectionId (REQUIRED): the name of your cloud resource connection.
* <li>storageUri (REQUIRED): the path to your GCS folder where data will be written to. This
* sink will create sub-folders for each project, dataset, and table destination. Example:
* if you specify a storageUri of {@code "gs://foo/bar"} and writing to table {@code
* "my_project.my_dataset.my_table"}, your data will be written under {@code
* "gs://foo/bar/my_project/my_dataset/my_table/"}
* <li>fileFormat (OPTIONAL): defaults to {@code "parquet"}
* <li>tableFormat (OPTIONAL): defaults to {@code "iceberg"}
* </ul>
*
* <p><b>NOTE:</b> This is only supported with the Storage Write API methods.
*
* @see <a href="https://cloud.google.com/bigquery/docs/iceberg-tables#api">BigQuery Tables for
* Apache Iceberg documentation</a>
*/
public Write<T> withBigLakeConfiguration(Map<String, String> bigLakeConfiguration) {
checkArgument(bigLakeConfiguration != null, "bigLakeConfiguration can not be null");
return toBuilder().setBigLakeConfiguration(bigLakeConfiguration).build();
}

/**
* Specifies a policy for handling failed inserts.
*
Expand Down Expand Up @@ -3454,8 +3486,21 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
checkArgument(
!getAutoSchemaUpdate(),
"withAutoSchemaUpdate only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
} else if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
checkArgument(
getBigLakeConfiguration() == null,
"bigLakeConfiguration is only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
} else {
if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
}
if (getBigLakeConfiguration() != null) {
checkArgument(
Arrays.stream(new String[] {CONNECTION_ID, STORAGE_URI})
.allMatch(getBigLakeConfiguration()::containsKey),
String.format(
"bigLakeConfiguration must contain keys '%s' and '%s'",
CONNECTION_ID, STORAGE_URI));
}
}
if (getRowMutationInformationFn() != null) {
checkArgument(
Expand Down Expand Up @@ -3905,6 +3950,7 @@ private <DestinationT> WriteResult continueExpandTyped(
getPropagateSuccessfulStorageApiWritesPredicate(),
getRowMutationInformationFn() != null,
getDefaultMissingValueInterpretation(),
getBigLakeConfiguration(),
getBadRecordRouter(),
getBadRecordErrorHandler());
return input.apply("StorageApiLoads", storageApiLoads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator<Wri
.addNullableByteArrayField("write_disposition")
.addNullableArrayField("schema_update_options", FieldType.BYTES)
.addNullableStringField("table_description")
.addNullableMapField("biglake_configuration", FieldType.STRING, FieldType.STRING)
.addNullableBooleanField("validate")
.addNullableByteArrayField("bigquery_services")
.addNullableInt32Field("max_files_per_bundle")
Expand Down Expand Up @@ -510,6 +511,9 @@ public Row toConfigRow(Write<?> transform) {
if (transform.getTableDescription() != null) {
fieldValues.put("table_description", transform.getTableDescription());
}
if (transform.getBigLakeConfiguration() != null) {
fieldValues.put("biglake_configuration", transform.getBigLakeConfiguration());
}
fieldValues.put("validate", transform.getValidate());
if (transform.getBigQueryServices() != null) {
fieldValues.put("bigquery_services", toByteArray(transform.getBigQueryServices()));
Expand Down Expand Up @@ -719,6 +723,10 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (tableDescription != null) {
builder = builder.setTableDescription(tableDescription);
}
Map<String, String> biglakeConfiguration = configRow.getMap("biglake_configuration");
if (biglakeConfiguration != null) {
builder = builder.setBigLakeConfiguration(biglakeConfiguration);
}
Boolean validate = configRow.getBoolean("validate");
if (validate != null) {
builder = builder.setValidate(validate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CONNECTION_ID;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.STORAGE_URI;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.gax.rpc.ApiException;
import com.google.api.services.bigquery.model.BigLakeConfiguration;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.EncryptionConfiguration;
import com.google.api.services.bigquery.model.Table;
Expand All @@ -31,6 +34,7 @@
import com.google.api.services.bigquery.model.TimePartitioning;
import io.grpc.StatusRuntimeException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,6 +45,7 @@
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -91,7 +96,8 @@ static TableDestination possiblyCreateTable(
CreateDisposition createDisposition,
@Nullable Coder<?> tableDestinationCoder,
@Nullable String kmsKey,
BigQueryServices bqServices) {
BigQueryServices bqServices,
@Nullable Map<String, String> bigLakeConfiguration) {
checkArgument(
tableDestination.getTableSpec() != null,
"DynamicDestinations.getTable() must return a TableDestination "
Expand Down Expand Up @@ -132,7 +138,8 @@ static TableDestination possiblyCreateTable(
createDisposition,
tableSpec,
kmsKey,
bqServices);
bqServices,
bigLakeConfiguration);
}
}
}
Expand All @@ -147,7 +154,8 @@ private static void tryCreateTable(
CreateDisposition createDisposition,
String tableSpec,
@Nullable String kmsKey,
BigQueryServices bqServices) {
BigQueryServices bqServices,
@Nullable Map<String, String> bigLakeConfiguration) {
TableReference tableReference = tableDestination.getTableReference().clone();
tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
try (DatasetService datasetService = bqServices.getDatasetService(options)) {
Expand Down Expand Up @@ -189,6 +197,24 @@ private static void tryCreateTable(
if (kmsKey != null) {
table.setEncryptionConfiguration(new EncryptionConfiguration().setKmsKeyName(kmsKey));
}
if (bigLakeConfiguration != null) {
TableReference ref = table.getTableReference();
table.setBiglakeConfiguration(
new BigLakeConfiguration()
.setTableFormat(
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
.setFileFormat(
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
.setConnectionId(
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
.setStorageUri(
String.format(
"%s/%s/%s/%s",
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
ref.getProjectId(),
ref.getDatasetId(),
ref.getTableId())));
}
datasetService.createTable(table);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public void processElement(ProcessContext context) {
createDisposition,
dynamicDestinations.getDestinationCoder(),
kmsKey,
bqServices);
bqServices,
null);
});

context.output(KV.of(tableDestination, context.element().getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class StorageApiLoads<DestinationT, ElementT>
private final boolean usesCdc;

private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation;
private final Map<String, String> bigLakeConfiguration;

private final BadRecordRouter badRecordRouter;

Expand All @@ -98,6 +100,7 @@ public StorageApiLoads(
Predicate<String> propagateSuccessfulStorageApiWritesPredicate,
boolean usesCdc,
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation,
Map<String, String> bigLakeConfiguration,
BadRecordRouter badRecordRouter,
ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
this.destinationCoder = destinationCoder;
Expand All @@ -118,6 +121,7 @@ public StorageApiLoads(
this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate;
this.usesCdc = usesCdc;
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this.bigLakeConfiguration = bigLakeConfiguration;
this.badRecordRouter = badRecordRouter;
this.badRecordErrorHandler = badRecordErrorHandler;
}
Expand Down Expand Up @@ -186,7 +190,8 @@ public WriteResult expandInconsistent(
createDisposition,
kmsKey,
usesCdc,
defaultMissingValueInterpretation));
defaultMissingValueInterpretation,
bigLakeConfiguration));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down Expand Up @@ -279,7 +284,8 @@ public WriteResult expandTriggered(
successfulRowsPredicate,
autoUpdateSchema,
ignoreUnknownValues,
defaultMissingValueInterpretation));
defaultMissingValueInterpretation,
bigLakeConfiguration));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down Expand Up @@ -372,7 +378,8 @@ public WriteResult expandUntriggered(
createDisposition,
kmsKey,
usesCdc,
defaultMissingValueInterpretation));
defaultMissingValueInterpretation,
bigLakeConfiguration));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import java.util.Map;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
private final @Nullable String kmsKey;
private final boolean usesCdc;
private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation;
private final @Nullable Map<String, String> bigLakeConfiguration;

public StorageApiWriteRecordsInconsistent(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
Expand All @@ -69,7 +71,8 @@ public StorageApiWriteRecordsInconsistent(
BigQueryIO.Write.CreateDisposition createDisposition,
@Nullable String kmsKey,
boolean usesCdc,
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation,
@Nullable Map<String, String> bigLakeConfiguration) {
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.failedRowsTag = failedRowsTag;
Expand All @@ -83,6 +86,7 @@ public StorageApiWriteRecordsInconsistent(
this.kmsKey = kmsKey;
this.usesCdc = usesCdc;
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this.bigLakeConfiguration = bigLakeConfiguration;
}

@Override
Expand Down Expand Up @@ -116,7 +120,8 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
kmsKey,
usesCdc,
defaultMissingValueInterpretation,
bigQueryOptions.getStorageWriteApiMaxRetries()))
bigQueryOptions.getStorageWriteApiMaxRetries(),
bigLakeConfiguration))
.withOutputTags(finalizeTag, tupleTagList)
.withSideInputs(dynamicDestinations.getSideInputs()));
result.get(failedRowsTag).setCoder(failedRowsCoder);
Expand Down
Loading
Loading