Skip to content

Commit

Permalink
Log the resource id when IOException happens in FileBasedSource (#30336)
Browse files Browse the repository at this point in the history
* Log the resource id when IOException happens in FileBasedSource

* Switch to throw RuntimeException

* Fixed the tests

* Log the error and keep throwing IOException

* fix the spotless
  • Loading branch information
liferoad authored Feb 20, 2024
1 parent 801252a commit 0d01f6e
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -472,23 +473,30 @@ public synchronized FileBasedSource<T> getCurrentSource() {
@Override
protected final boolean startImpl() throws IOException {
FileBasedSource<T> source = getCurrentSource();
this.channel = FileSystems.open(source.getSingleFileMetadata().resourceId());
if (channel instanceof SeekableByteChannel) {
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
seekChannel.position(source.getStartOffset());
} else {
// Channel is not seekable. Must not be a subrange.
checkArgument(
source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
"Subrange-based sources must only be defined for file types that support seekable "
+ " read channels");
checkArgument(
source.getStartOffset() == 0,
"Start offset %s is not zero but channel for reading the file is not seekable.",
source.getStartOffset());
}
ResourceId resourceId = source.getSingleFileMetadata().resourceId();
try {
this.channel = FileSystems.open(resourceId);
if (channel instanceof SeekableByteChannel) {
SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
seekChannel.position(source.getStartOffset());
} else {
// Channel is not seekable. Must not be a subrange.
checkArgument(
source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
"Subrange-based sources must only be defined for file types that support seekable "
+ " read channels");
checkArgument(
source.getStartOffset() == 0,
"Start offset %s is not zero but channel for reading the file is not seekable.",
source.getStartOffset());
}

startReading(channel);
startReading(channel);
} catch (IOException e) {
LOG.error(
"Failed to process {}, which could be corrupted or have a wrong format.", resourceId);
throw new IOException(e);
}

// Advance once to load the first record.
return advanceImpl();
Expand Down

0 comments on commit 0d01f6e

Please sign in to comment.