Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Error Handlers to File IO and related IOs (TextIO, AvroIO) #29670

Merged
merged 16 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand Down Expand Up @@ -236,6 +238,26 @@
* destination-dependent: every window/pane for every destination will use the same number of shards
* specified via {@link Write#withNumShards} or {@link Write#withSharding}.
*
* <h3>Handling Errors</h3>
*
* <p>When using dynamic destinations, or when using a formatting function to format a record for
* writing, it's possible for an individual record to be malformed, causing an exception. By
* default, these exceptions are propagated to the runner, and are usually retried, though this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these exceptions are propagated to the runner -> this means the bundle fails right? May be good to clearly mention that

* depends on the runner. Alternately, these errors can be routed to another {@link PTransform} by
* using {@link Write#withBadRecordErrorHandler(ErrorHandler)}. The ErrorHandler is registered with
* the pipeline (see below). See {@link ErrorHandler} for more documentation. Of note, this error
* handling only handles errors related to specific records. It does not handle errors related to
* connectivity, authorization, etc. as those should be retried by the runner.
*
* <pre>{@code
* PCollection<> records = ...;
* PTransform<PCollection<BadRecord>,?> alternateSink = ...;
* try (BadRecordErrorHandler<?> handler = pipeline.registerBadRecordErrorHandler(alternateSink) {
* records.apply("Write", FileIO.writeDynamic().otherConfigs()
* .withBadRecordErrorHandler(handler));
* }
* }</pre>
*
* <h3>Writing custom types to sinks</h3>
*
* <p>Normally, when writing a collection of a custom type using a {@link Sink} that takes a
Expand Down Expand Up @@ -1016,6 +1038,8 @@ public static FileNaming relativeFileNaming(

abstract boolean getNoSpilling();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<DestinationT, UserT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1062,6 +1086,9 @@ abstract Builder<DestinationT, UserT> setSharding(

abstract Builder<DestinationT, UserT> setNoSpilling(boolean noSpilling);

abstract Builder<DestinationT, UserT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Write<DestinationT, UserT> build();
}

Expand Down Expand Up @@ -1288,6 +1315,12 @@ public Write<DestinationT, UserT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have the documentation live here instead of WriteFiles? I think users will interact with FileIO more as the top-level transform.

public Write<DestinationT, UserT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

@VisibleForTesting
Contextful<Fn<DestinationT, FileNaming>> resolveFileNamingFn() {
if (getDynamic()) {
Expand Down Expand Up @@ -1391,6 +1424,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
writeFiles = writeFiles.withNoSpilling();
}
if (getBadRecordErrorHandler() != null) {
writeFiles = writeFiles.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
return input.apply(writeFiles);
}

Expand Down
20 changes: 20 additions & 0 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
Expand Down Expand Up @@ -176,6 +178,10 @@
*
* <p>For backwards compatibility, {@link TextIO} also supports the legacy {@link
* DynamicDestinations} interface for advanced features via {@link Write#to(DynamicDestinations)}.
*
* <p>Error handling for records that are malformed can be handled by using {@link
* TypedWrite#withBadRecordErrorHandler(ErrorHandler)}. See documentation in
* {@link FileIO} for details on usage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We point to FileIO for details on usage and examples, but for the withBadRecordErrorHandler method definition below we point to WriteFiles (same pattern in AvroIO)

I think it makes sense to accumulate any error handling documentation in one place to make it easier for users to find and devs to update in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a bunch of sense, I'll normalize it

*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -708,6 +714,8 @@ public abstract static class TypedWrite<UserT, DestinationT>
*/
abstract WritableByteChannelFactory getWritableByteChannelFactory();

abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract Builder<UserT, DestinationT> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -754,6 +762,9 @@ abstract Builder<UserT, DestinationT> setNumShards(
abstract Builder<UserT, DestinationT> setWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory);

abstract Builder<UserT, DestinationT> setBadRecordErrorHandler(
@Nullable ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract TypedWrite<UserT, DestinationT> build();
}

Expand Down Expand Up @@ -993,6 +1004,12 @@ public TypedWrite<UserT, DestinationT> withNoSpilling() {
return toBuilder().setNoSpilling(true).build();
}

/** See {@link WriteFiles#withBadRecordErrorHandler(ErrorHandler)}. */
public TypedWrite<UserT, DestinationT> withBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> errorHandler) {
return toBuilder().setBadRecordErrorHandler(errorHandler).build();
}

/** Don't write any output files if the PCollection is empty. */
public TypedWrite<UserT, DestinationT> skipIfEmpty() {
return toBuilder().setSkipIfEmpty(true).build();
Expand Down Expand Up @@ -1083,6 +1100,9 @@ public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (getNoSpilling()) {
write = write.withNoSpilling();
}
if (getBadRecordErrorHandler() != null) {
write = write.withBadRecordErrorHandler(getBadRecordErrorHandler());
}
if (getSkipIfEmpty()) {
write = write.withSkipIfEmpty();
}
Expand Down
Loading
Loading