Skip to content

Commit

Permalink
Support withFormatRecordOnFailureFunction() for BigQuery STORAGE_WRIT…
Browse files Browse the repository at this point in the history
…E_API and STORAGE_API_AT_LEAST_ONCE methods (apache#31659)

* Support withFormatRecordOnFailureFunction() for BigQuery STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods

* Update CHANGES.md
  • Loading branch information
Amar3tto authored and reeba212 committed Dec 4, 2024
1 parent ad19010 commit d75c7a4
Show file tree
Hide file tree
Showing 13 changed files with 522 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)).
* [BigQueryIO] Added support for withFormatRecordOnFailureFunction() for STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods (Java) ([#31354](https://github.com/apache/beam/issues/31354)).
* Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2711,9 +2711,14 @@ public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunct
}

/**
* If an insert failure occurs, this function is applied to the originally supplied row T. The
* resulting {@link TableRow} will be accessed via {@link
* WriteResult#getFailedInsertsWithErr()}.
* If an insert failure occurs, this function is applied to the originally supplied T element.
*
* <p>For {@link Method#STREAMING_INSERTS} method, the resulting {@link TableRow} will be
* accessed via {@link WriteResult#getFailedInsertsWithErr()}.
*
* <p>For {@link Method#STORAGE_WRITE_API} and {@link Method#STORAGE_API_AT_LEAST_ONCE} methods,
* the resulting {@link TableRow} will be accessed via {@link
* WriteResult#getFailedStorageApiInserts()}.
*/
public Write<T> withFormatRecordOnFailureFunction(
SerializableFunction<T, TableRow> formatFunction) {
Expand Down Expand Up @@ -3773,6 +3778,7 @@ private <DestinationT> WriteResult continueExpandTyped(
dynamicDestinations,
elementSchema,
elementToRowFunction,
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
// We could support both of these by falling back to
Expand All @@ -3795,7 +3801,9 @@ private <DestinationT> WriteResult continueExpandTyped(
storageApiDynamicDestinations =
(StorageApiDynamicDestinations<T, DestinationT>)
new StorageApiDynamicDestinationsProto(
dynamicDestinations, getWriteProtosClass());
dynamicDestinations,
getWriteProtosClass(),
getFormatRecordOnFailureFunction());
} else if (getAvroRowWriterFactory() != null) {
// we can configure the avro to storage write api proto converter for this
// assuming the format function returns an Avro GenericRecord
Expand All @@ -3818,6 +3826,7 @@ private <DestinationT> WriteResult continueExpandTyped(
dynamicDestinations,
avroSchemaFactory,
recordWriterFactory.getToAvroFn(),
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
Expand All @@ -3827,6 +3836,7 @@ private <DestinationT> WriteResult continueExpandTyped(
new StorageApiDynamicDestinationsTableRow<>(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
Expand All @@ -42,6 +43,8 @@ abstract static class Value {
abstract ProtoRows getProtoRows();

abstract List<Instant> getTimestamps();

abstract List<@Nullable TableRow> getFailsafeTableRows();
}

interface ConvertUnknownFields {
Expand Down Expand Up @@ -96,11 +99,18 @@ public Value next() {
}

List<Instant> timestamps = Lists.newArrayList();
List<@Nullable TableRow> failsafeRows = Lists.newArrayList();
ProtoRows.Builder inserts = ProtoRows.newBuilder();
long bytesSize = 0;
while (underlyingIterator.hasNext()) {
StorageApiWritePayload payload = underlyingIterator.next();
ByteString byteString = ByteString.copyFrom(payload.getPayload());
@Nullable TableRow failsafeTableRow = null;
try {
failsafeTableRow = payload.getFailsafeTableRow();
} catch (IOException e) {
// Do nothing, table row will be generated later from row bytes
}
if (autoUpdateSchema) {
try {
@Nullable TableRow unknownFields = payload.getUnknownFields();
Expand All @@ -116,7 +126,10 @@ public Value next() {
// This generally implies that ignoreUnknownValues=false and there were still
// unknown values here.
// Reconstitute the TableRow and send it to the failed-rows consumer.
TableRow tableRow = protoToTableRow.apply(byteString);
TableRow tableRow =
failsafeTableRow != null
? failsafeTableRow
: protoToTableRow.apply(byteString);
// TODO(24926, reuvenlax): We need to merge the unknown fields in! Currently we
// only execute this
// codepath when ignoreUnknownFields==true, so we should never hit this codepath.
Expand All @@ -142,12 +155,13 @@ public Value next() {
timestamp = elementsTimestamp;
}
timestamps.add(timestamp);
failsafeRows.add(failsafeTableRow);
bytesSize += byteString.size();
if (bytesSize > splitSize) {
break;
}
}
return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps);
return new AutoValue_SplittingIterable_Value(inserts.build(), timestamps, failsafeRows);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,17 @@ public void processElement(
.withTimestamp(timestamp);
o.get(successfulWritesTag).output(KV.of(element.getKey(), payload));
} catch (TableRowToStorageApiProto.SchemaConversionException conversionException) {
TableRow tableRow;
TableRow failsafeTableRow;
try {
tableRow = messageConverter.toTableRow(element.getValue());
failsafeTableRow = messageConverter.toFailsafeTableRow(element.getValue());
} catch (Exception e) {
badRecordRouter.route(o, element, elementCoder, e, "Unable to convert value to TableRow");
return;
}
o.get(failedWritesTag)
.output(new BigQueryStorageApiInsertError(tableRow, conversionException.toString()));
.output(
new BigQueryStorageApiInsertError(
failsafeTableRow, conversionException.toString()));
} catch (Exception e) {
badRecordRouter.route(
o, element, elementCoder, e, "Unable to convert value to StorageWriteApiPayload");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface MessageConverter<T> {
StorageApiWritePayload toMessage(
T element, @Nullable RowMutationInformation rowMutationInformation) throws Exception;

TableRow toTableRow(T element);
TableRow toFailsafeTableRow(T element);
}

StorageApiDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,20 @@ class StorageApiDynamicDestinationsBeamRow<T, DestinationT extends @NonNull Obje
extends StorageApiDynamicDestinations<T, DestinationT> {
private final TableSchema tableSchema;
private final SerializableFunction<T, Row> toRow;
private final @Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction;

private final boolean usesCdc;

StorageApiDynamicDestinationsBeamRow(
DynamicDestinations<T, DestinationT> inner,
Schema schema,
SerializableFunction<T, Row> toRow,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction,
boolean usesCdc) {
super(inner);
this.tableSchema = BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(schema);
this.toRow = toRow;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
}

Expand Down Expand Up @@ -96,12 +99,19 @@ public StorageApiWritePayload toMessage(
Message msg =
BeamRowToStorageApiProto.messageFromBeamRow(
descriptorToUse, toRow.apply(element), changeType, changeSequenceNum);
return StorageApiWritePayload.of(msg.toByteArray(), null);
return StorageApiWritePayload.of(
msg.toByteArray(),
null,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}

@Override
public TableRow toTableRow(T element) {
return BigQueryUtils.toTableRow(toRow.apply(element));
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
return BigQueryUtils.toTableRow(toRow.apply(element));
}
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ class StorageApiDynamicDestinationsGenericRecord<T, DestinationT extends @NonNul

private final SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord;
private final SerializableFunction<@Nullable TableSchema, Schema> schemaFactory;
private final @javax.annotation.Nullable SerializableFunction<T, TableRow>
formatRecordOnFailureFunction;

private boolean usesCdc;

StorageApiDynamicDestinationsGenericRecord(
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<@Nullable TableSchema, Schema> schemaFactory,
SerializableFunction<AvroWriteRequest<T>, GenericRecord> toGenericRecord,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction,
boolean usesCdc) {
super(inner);
this.toGenericRecord = toGenericRecord;
this.schemaFactory = schemaFactory;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
}

Expand Down Expand Up @@ -96,13 +101,20 @@ public StorageApiWritePayload toMessage(
toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)),
changeType,
changeSequenceNum);
return StorageApiWritePayload.of(msg.toByteArray(), null);
return StorageApiWritePayload.of(
msg.toByteArray(),
null,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}

@Override
public TableRow toTableRow(T element) {
return BigQueryUtils.convertGenericRecordToTableRow(
toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema);
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
return BigQueryUtils.convertGenericRecordToTableRow(
toGenericRecord.apply(new AvroWriteRequest<>(element, avroSchema)), bqTableSchema);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,29 @@
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import java.lang.reflect.InvocationTargetException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.checkerframework.checker.nullness.qual.NonNull;

/** Storage API DynamicDestinations used when the input is a compiled protocol buffer. */
class StorageApiDynamicDestinationsProto<T extends Message, DestinationT extends @NonNull Object>
extends StorageApiDynamicDestinations<T, DestinationT> {
DescriptorProtos.DescriptorProto descriptorProto;
private final DescriptorProtos.DescriptorProto descriptorProto;
private final @Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction;

@SuppressWarnings({"unchecked", "nullness"})
StorageApiDynamicDestinationsProto(
DynamicDestinations<T, DestinationT> inner, Class<T> protoClass) {
DynamicDestinations<T, DestinationT> inner,
Class<T> protoClass,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction) {
super(inner);
try {
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.descriptorProto =
fixNestedTypes(
(Descriptors.Descriptor)
Expand Down Expand Up @@ -84,12 +90,27 @@ public StorageApiWritePayload toMessage(
// we can forward
// the through directly. This means that we don't currently support ignoreUnknownValues or
// autoUpdateSchema.
return StorageApiWritePayload.of(element.toByteArray(), null);
return StorageApiWritePayload.of(
element.toByteArray(),
null,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}

@Override
public TableRow toTableRow(T element) {
throw new RuntimeException("Not implemented!");
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
try {
return TableRowToStorageApiProto.tableRowFromMessage(
DynamicMessage.parseFrom(
TableRowToStorageApiProto.wrapDescriptorProto(descriptorProto),
element.toByteArray()),
true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonNull Object>
extends StorageApiDynamicDestinations<T, DestinationT> {
private final SerializableFunction<T, TableRow> formatFunction;
private final @Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction;

private final boolean usesCdc;
private final CreateDisposition createDisposition;
Expand All @@ -51,12 +52,14 @@ public class StorageApiDynamicDestinationsTableRow<T, DestinationT extends @NonN
StorageApiDynamicDestinationsTableRow(
DynamicDestinations<T, DestinationT> inner,
SerializableFunction<T, TableRow> formatFunction,
@Nullable SerializableFunction<T, TableRow> formatRecordOnFailureFunction,
boolean usesCdc,
CreateDisposition createDisposition,
boolean ignoreUnknownValues,
boolean autoSchemaUpdates) {
super(inner);
this.formatFunction = formatFunction;
this.formatRecordOnFailureFunction = formatRecordOnFailureFunction;
this.usesCdc = usesCdc;
this.createDisposition = createDisposition;
this.ignoreUnknownValues = ignoreUnknownValues;
Expand Down Expand Up @@ -151,8 +154,12 @@ public DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns)
}

@Override
public TableRow toTableRow(T element) {
return formatFunction.apply(element);
public TableRow toFailsafeTableRow(T element) {
if (formatRecordOnFailureFunction != null) {
return formatRecordOnFailureFunction.apply(element);
} else {
return formatFunction.apply(element);
}
}

@Override
Expand Down Expand Up @@ -183,7 +190,10 @@ public StorageApiWritePayload toMessage(
unknownFields,
changeType,
changeSequenceNum);
return StorageApiWritePayload.of(msg.toByteArray(), unknownFields);
return StorageApiWritePayload.of(
msg.toByteArray(),
unknownFields,
formatRecordOnFailureFunction != null ? toFailsafeTableRow(element) : null);
}
};
}
Loading

0 comments on commit d75c7a4

Please sign in to comment.