Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to leverage Flux to preserve the sequence of bulk puts... #1157

Conversation

gregschohn
Copy link
Collaborator

Description

This is a partial and rough implementation of AndreKurait's idea that sequences bulk put responses into a flux that's effectively a progress report. We can then use that to monitor and update the last watermark that we've made it to for purposes of writing the work continuation record when a lease expires.

TODO:

  • Unit tests compile/pass

  • sendBulkRequest takes in the segment/docIds (at least the last one) & that's wired to the SegmentDocumentCursor

  • exitOnLeaseTimeout() is enhanced to do create a new work item from the lastDocIndexed

  • Make this easily exitOnLeaseTimeout() easily transferable to MIGRATIONS-2172

  • Category Enhancement

  • Why these changes are required? To support preserving backfill work even when a process is terminated.

  • What is the old behavior before changes and new behavior after changes? See Jira.

Testing

Tests break - this PR is to communicate the idea.

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

… the sequence of bulk puts in Document Migration to have a flux that effectively reports progress.

Signed-off-by: Greg Schohn <[email protected]>
@@ -76,7 +75,7 @@ Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String
.log())
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then() // Discard the response object
.then(Mono.just(new SegmentDocumentCursor())) // Discard the response object
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: this constructor would take in the last docsBatch

@@ -307,6 +310,7 @@ public static void main(String[] args) throws Exception {
run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we'll need to make the LuceneDocumentsReader read in order too

@AndreKurait
Copy link
Member

Thanks greg, remaining work tracked in #1160

@gregschohn gregschohn deleted the RfsFluxListenerForProgress branch December 10, 2024 15:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants