-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add e2e acknowledgments support to opensearch source #3278
Conversation
Signed-off-by: Taylor Gray <[email protected]>
|
||
public class WorkerCommonUtils { | ||
|
||
static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this value so large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same approach that was taken with the s3 source with scan, and it's the approach kafka source had to take as well after @hshardeesi discovered a bug with acknowledgments where a timeout was crashing the pipeline.
The only source that currently has a non-infinite acknowledgment timeout is s3 with sqs, because it is bound by visibility_timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does a node give up its index to another node (in the context of source coordination)? I'd expect these values should be aligned such that when giving it up, the acknowledgement timeout ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledgments does not support extending the timeout currently. Source coordination renews the timeout every time saveProgressStateForPartition
is called
processIndex(indexPartition.get(), acknowledgementSet.getLeft()); | ||
|
||
completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), | ||
indexPartition.get(), sourceCoordinator); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears there is some opportunity to consolidate code in the workers. This behavior should be common and doesn't need to vary between workers, right?
Perhaps there can be a higher-level abstraction over the workers that handles the common behaviors. Or perhaps use a common base class.
Maybe this is something for a follow-on PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I didn't want to do a full refactor here as that would be pretty large, but I at least didn't duplicate the new code in this PR. The code differs some in certain exceptions that are needed to be caught and in how the index is processed (processIndex
function), but is similar in some other ways (sleeping and grabbing partitions, creating ack set, closing partitions, a good amount of exception handling is also similar)
indexPartition.getPartitionKey(), | ||
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(), | ||
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to handle result==false
case? We don't have any code doing negative acks, so it is not mandatory now. But may be used in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the callout. Just curious how are we planning on handling negative acks? Just retrying the entire object, index, etc?
…ect#3278) Signed-off-by: Taylor Gray <[email protected]>
Description
Adds acknowledgment support to the opensearch source
Issues Resolved
Related to #1985
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.