Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding subshard work items on lease expiry #1160

Merged
merged 21 commits into from
Dec 5, 2024

Conversation

chelma
Copy link
Member

@chelma chelma commented Nov 22, 2024

Description

  • Added code to finish and create remainder work item when a lease expires.

Behavior changes is as follows:

  • Updates the LuceneDocumentsReader to sort the segments and emit docs in sequence, and skip until passed in startingDocId. Note: Docs within a segment are still read in parallel, just emitted in sequence once aggregated together.
  • Updates the RfsLuceneDocument to contain the luceneDocId (segmentBaseDoc + docId)
  • Updates the DocumentReindexer to emit a flux of the latest sequential docId processed.
  • Updates the DocumentsRunner to plum context from work item progress and cancellation to the LeaseEnd
  • Updates the OpenSearchWorkCoordinator to rename numAttempts to nextAcquisitionLeaseExponent (incrementing script version from poc -> 2.0)
  • Updates exitOnLeaseTimeout to handle cancelling document reindexing work, and creating successor work item based on progress checkpoint and shard work timing. Logic below.

The lease time increase logic has changed. Behavior is as follows:

  • If worker did not have enough time to process any docs, time is doubled for next run.
  • else
    • If worker spent more than 10% of time downloading/extracting the shard, double the lease time for the next run
    • else if worker spent less then 2.5% of time downloading/extracting the shard, half the lease time for next run
    • else keep lease time same for successive run

Added E2E test as follows:

  • Create docs with workload generator, set up toxiproxy and leases to ensure will take 8 20 second leases to finish shard with checkpoints. Verify exit codes and docs migrated

Issues Resolved

Testing

Tested in AWS and added new E2E test around the scenario

Check List

  • New functionality includes testing
    • All tests pass, including unit test, integration test and doctest
  • New functionality has been documented
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

/**
* We need to ensure a stable ordering of segments so we can start reading from a specific segment and document id.
* To do this, we sort the segments by their name.
*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, IIRC - Mikayla was doing something like this and not everything will be a SegmentReader. She was originally using 'name', but it wasn't always present. We need some other way to put these into a canonical order - anything that's stored in the metadata that's stable and unique would work. Size wouldn't be unique, so that could be a problem...

Does the metadata not have these in a stable order that's pulled the same way every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndreKurait and I found a simple solution here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the simple solution? Why is this code still here? I didn't think that we needed this.
If we have to sort, can we sort on the starting document # at the beginning of each segment?

…removed shard number and index from every doc checkpoint, and based checkpoint on global shard doc number with +1 to fix behavior where last doc was replayed again. Todo: adjust other unit tests

Signed-off-by: Andre Kurait <[email protected]>
Signed-off-by: Andre Kurait <[email protected]>
Signed-off-by: Andre Kurait <[email protected]>
…ased on 10% of worker time downloading

Signed-off-by: Andre Kurait <[email protected]>
Signed-off-by: Andre Kurait <[email protected]>
Signed-off-by: Andre Kurait <[email protected]>
Signed-off-by: Andre Kurait <[email protected]>
Signed-off-by: Andre Kurait <[email protected]>
@AndreKurait AndreKurait changed the title IN-PROGRESS: Adding subshard work items on lease expiry Adding subshard work items on lease expiry Dec 5, 2024
@AndreKurait AndreKurait marked this pull request as ready for review December 5, 2024 15:22
Copy link

codecov bot commented Dec 5, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 80.74%. Comparing base (3d82d1f) to head (178fe55).
Report is 22 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1160      +/-   ##
============================================
- Coverage     80.97%   80.74%   -0.24%     
- Complexity     2996     3003       +7     
============================================
  Files           407      409       +2     
  Lines         15241    15408     +167     
  Branches       1021     1031      +10     
============================================
+ Hits          12342    12441      +99     
- Misses         2272     2338      +66     
- Partials        627      629       +2     
Flag Coverage Δ
unittests 80.74% <ø> (-0.24%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines +170 to 201
String expectedId = "unchangeddoc";
String actualId = doc.id;

String expectedType = "type1";
String actualType = doc.type;
String expectedType = "type2";
String actualType = doc.type;

String expectedSource = "{\"title\":\"This is a doc with complex history. Updated!\"}";
String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType,
expectedSource, actualSource);
assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource);
return true;
}).expectNextMatches(doc -> {
String expectedId = "unchangeddoc";
String expectedId = "updateddoc";
String actualId = doc.id;

String expectedType = "type2";
String actualType = doc.type;
String expectedType = "type2";
String actualType = doc.type;

String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}";
String expectedSource = "{\"content\":\"Updated!\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource);
assertDocsEqual(expectedId, actualId, expectedType, actualType,
expectedSource, actualSource);
return true;
}).expectNextMatches(doc -> {
String expectedId = "updateddoc";
String expectedId = "complexdoc";
String actualId = doc.id;

String expectedType = "type2";
String expectedType = "type1";
String actualType = doc.type;

String expectedSource = "{\"content\":\"Updated!\"}";
String expectedSource = "{\"title\":\"This is a doc with complex history. Updated!\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just verifying that these orders changed because we are now sorting segments & docs?

Copy link
Member

@AndreKurait AndreKurait Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we're not necessarily sorting the docs, but keeping a consistent ordering of them

@AndreKurait AndreKurait merged commit f2e93a4 into opensearch-project:main Dec 5, 2024
22 checks passed
Copy link
Collaborator

@gregschohn gregschohn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked through the tests, but some of the comments that I've left are significant. I hope to see them acknowledged before they're released.

.addArgument(workItemId)
.log();
if (progressCursorRef.get() != null) {
log.atWarn().setMessage("Progress cursor set, cancelling active doc migration if still running").log();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does 'if' mean here?

.log();
if (progressCursorRef.get() != null) {
log.atWarn().setMessage("Progress cursor set, cancelling active doc migration if still running").log();
cancellationRunnable.run();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a guarantee on how long this should take to run, so this would be good to put into an async thread and to only wait for so many seconds (maybe 10% of the time that we've allocated to wrapping up?)

workerId)
workerId,
Clock.systemUTC(),
workItemRef::set);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a smell here. The coordinator code returns a value and it also runs hook with the same value that it's returning. You need to monitor the progress cursor too and you push that responsibility onto the next layer down. Combining this and the progress monitoring and extracting it from the coordinator both seem like they'd make the code more maintainable and easier to follow.

What really matters is what the DocumentRunner is using, not what the coordinator returned. This is plumbing some brittle assumptions through the codebase that work today but add debt.

workerId,
Clock.systemUTC(),
workItemRef::set);
var processManager = new LeaseExpireTrigger(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty unwieldy. Most of these values should probably be fields in the instance and exitOnLeaseTimeout should be leveraging a method to do the work continuation instead of a static. Most of the logic w/in that function is relevant for 2 other critical use cases too - shutdown from SIGTERM and unexpected exceptions (including OOM). Doing a refactor to make the inner cleanup method easier and more compact will make enabling those other use cases trivial.

I also cannot figure out why the processManager object doesn't get created within the run method - even if constructed via a supplier if there is some policy that we want to thread. It's all around hard to follow (sorry, I think that some of this is from my initial work to put this here in the first place)

Comment on lines +386 to +387
throw new IllegalStateException("Unexpected state with progressCursor set without a" +
"work item");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an assertion that we should be doing when we're setting the progress - or something that by virtue of our model is fundamentally impossible to express. The model for where/when we're hooking these updates is part of the issue (see the comment about the hook in the coordinator).

@@ -123,26 +123,39 @@ static class WorkItemWithPotentialSuccessors {
private final ObjectMapper objectMapper;
@Getter
private final Clock clock;
private final Consumer<WorkItemAndDuration> workItemConsumer;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment in the main class - this class shouldn't need to update a consumer.
We should already have a context that we're publishing telemetrics to - but more importantly, I don't understand what the semantics would be for. If one class is invoked for 4 different work items, but calls these triggers in an arbitrary order, is that ok? What's the overall contract between caller and callee? What happens if the callee throws, etc, etc? Callbacks and hooks are incredibly useful, but they add complexity and when we're returning the values already, we should opt to keep it simple.

var docMigrationCursors = setupDocMigration(workItem.getWorkItem(), context);
var latch = new CountDownLatch(1);
var finishScheduler = Schedulers.newSingle( "workFinishScheduler");
var disposable = docMigrationCursors
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not its own cancel/dispose (or autocloseable) method? Are you trying to make sure that only the creator has access to cancel the class via this back-door hook? That doesn't seem like it needs to be a critical concern and it makes the code much harder to maintain

.subscribe(lastItem -> {},
error -> log.atError()
.setCause(error)
.setMessage("Error prevented all batches from being processed")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are 'all batches'? That sounds like it could be the whole run of data - but i don't think that it is

.addArgument(workItem.getWorkItem().getIndexName())
.addArgument(workItem.getWorkItem().getShardNumber())
.log();
latch.countDown();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this get called if there's an error? We await on the latch below - will we terminate in that case?

@@ -862,7 +885,7 @@ public void createSuccessorWorkItemsAndMarkComplete(
e -> ctx.addTraceException(e, true)
);
retryWithExponentialBackoff(
() -> createUnassignedWorkItemsIfNonexistent(successorWorkItemIds),
() -> createUnassignedWorkItemsIfNonexistent(successorWorkItemIds, successorNextAcquisitionLeaseExponent),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants