-
Notifications
You must be signed in to change notification settings - Fork 208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add e2e acknowledgment and checkpointing to RDS source #4819
Conversation
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
Signed-off-by: Hai Yan <[email protected]>
@@ -56,6 +57,12 @@ public class RdsSourceConfig { | |||
@JsonProperty("acknowledgments") | |||
private boolean acknowledgments = false; | |||
|
|||
@JsonProperty("s3_data_file_acknowledgment_timeout") | |||
private Duration dataFileAcknowledgmentTimeout = Duration.ofMinutes(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the avg time taken to process data file ? Is this timeout good enough ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's pretty fast in my test but it was not a large dataset. I guess we will need to find out and update accordingly. Will put a larger number here.
I see documentDb uses 2 hours by default. Wondering how was that number calculated?
} else { | ||
do { | ||
currentChangeEventStatus = changeEventStatuses.poll(); | ||
} while (!changeEventStatuses.isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to get stuck in this while loop ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. If the add rate is equal or higher than poll rate, yes. I can add another condition to exit the loop and do checkpoint after certain number of polls.
do { | ||
currentChangeEventStatus = changeEventStatuses.poll(); | ||
} while (!changeEventStatuses.isEmpty()); | ||
streamCheckpointer.checkpoint(currentChangeEventStatus.getBinlogCoordinate()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider adding metrics for checkpoint count.
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
||
public class StreamCheckpointManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont see this used in steam worker..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stream worker here just connect to binlog stream. The stream events are processed in BinlogEventListener
class. You can find StreamCheckpointManager used there.
Signed-off-by: Hai Yan <[email protected]>
@@ -101,6 +129,26 @@ public void onEvent(com.github.shyiko.mysql.binlog.event.Event event) { | |||
} | |||
} | |||
|
|||
public void stopClient() { | |||
try { | |||
binaryLogClient.disconnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may want to add a debug/info message when the client is disconnected. This will help during performance testing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
acknowledgementSet.add(transformedEvent); | ||
} | ||
|
||
bufferAccumulator.add(new Record<>(transformedEvent)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BufferAccumulator
is not thread safe. I would recommend to instantiate this set with in the run method itself instead of passing to this thread. Or avoid using Accumulator if it is not really required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I moved the instantiation into the run method.
} else { | ||
exportFileErrorCounter.increment(); | ||
LOG.warn("Negative acknowledgment received for data file {}, retrying", dataFilePartition.getKey()); | ||
sourceCoordinator.giveUpPartition(dataFilePartition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It me just me trying to understand the code here. Not a real comment about the code.
I would assume the retry on this partition and also keeping track of retryCount is handled by the sourceCoordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Once the datafile partition is given up, it will be available for acquire again and the file load will be retried by whichever node that picked up the partition.
Signed-off-by: Hai Yan <[email protected]>
Description
This PR adds:
Issues Resolved
Contributes to #4561
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.