-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FileReader improvements #337
FileReader improvements #337
Conversation
final ListObjectsV2Result objectListing = s3Client.listObjectsV2(request); | ||
currentBatch = objectListing.getObjectSummaries() | ||
final ListObjectsV2Request request = new ListObjectsV2Request().withBucketName(bucketName) | ||
.withMaxKeys(s3SourceConfig.getInt(FETCH_PAGE_SIZE) * PAGE_SIZE_FACTOR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where I thought we could make an improvement, by only querying after the last known processed file from S3.
We could have a config item for the first time, but after that we could track the last processed file so that we can query the next time after that file reducing the number of queries we need to make to S3 and starting faster.
At the moment we are streaming every file in the S3 bucket every time we callfetchObjectSummaries()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a known issue and is a little out of scope of this PR. Will add startAfter
property as discussed in next PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using the S3ObjectSummaryIterator from my earlier work.
46fab1c
to
7aecceb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
} | ||
}; | ||
.filter(objectSummary -> !failedObjectKeys.contains(objectSummary.getKey()))); | ||
return s3ObjectStream.iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than hardcoding the filter why not pass a Predicate to do the filtering? Or on the outside of the S3ObjectSummaryIterator you can use the Apache Commons IteratorUtils.filteredIterator to apply a predicate to the iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you and I would like to have it done in a different I think. I do think that we should not need to pass FileReader object here and there just keep track of files that have faulty records to filter them out later.
But this requires a bigger refactoring that is a little out of scope of this PR.
But thanks for bringing this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can insert what we think might be the final approach into the current FileReader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we can filter out bad records and record them inside a Predicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Claudenw going to merge this, and create an issue for follow up, I think a wider refactoring sounds plausible in the future, perhaps when this is moved to commons.
Replacing custom iterator that has known issues from previous PR's comments with Java streams.