Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding subshard work items on lease expiry #1160

Merged
merged 21 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
55a83ca
Checkpoint - code in place for subshard work items, need to test
chelma Nov 22, 2024
3429a5a
Improved cursor plumbing for RFS SubShard work items
chelma Nov 25, 2024
2aae632
Additional changes per PR comments
chelma Nov 25, 2024
2b33a84
Merge remote-tracking branch 'upstream/main' into MIGRATIONS-2128
AndreKurait Nov 25, 2024
2c2a708
Modify LuceneDocumentsReader to read docs/segments sequentially
AndreKurait Nov 25, 2024
56839cd
Refactor of partial shard work items - added sequential doc reading, …
AndreKurait Dec 2, 2024
cf6ed86
Fix spotless issues
AndreKurait Dec 2, 2024
920be77
Working subshard
AndreKurait Dec 3, 2024
d8c4372
Rename numAttempts to leaseAcquisitionExponent and add max exponent b…
AndreKurait Dec 4, 2024
40eca92
Add worker cancellation on lease expiration
AndreKurait Dec 4, 2024
6211c33
Fix lucene starting doc id
AndreKurait Dec 4, 2024
e403228
Add lease duration decrease if shard setup is < 2.5% of lease time
AndreKurait Dec 4, 2024
e9ce08e
Fix WorkCoordinatorTest.java
AndreKurait Dec 4, 2024
5d82fbe
Add LeaseExpirationTest
AndreKurait Dec 5, 2024
e4be465
Fix scheduler dispose
AndreKurait Dec 5, 2024
b5640f5
Merge branch 'main' into MIGRATIONS-2128
AndreKurait Dec 5, 2024
8494eec
Address spotless
AndreKurait Dec 5, 2024
9820fa1
Address comments for LeaseExpirationTest
AndreKurait Dec 5, 2024
2d3ed9c
Update messaging on deletedDocs
AndreKurait Dec 5, 2024
c4dcbc4
Update RFS Design doc with successor work items
AndreKurait Dec 5, 2024
178fe55
Fix WorkCoordinatorTest
AndreKurait Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
}
// Otherwise, shift the SegmentReaders to the front
else if (leafReader1 instanceof SegmentReader && !(leafReader2 instanceof SegmentReader)) {
log.info("Found non-SegmentReader of type {} in the DirectoryReader", leafReader2.getClass().getName());

Check failure on line 131 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1192

Define a constant instead of duplicating this literal "Found non-SegmentReader of type {} in the DirectoryReader" 4 times.
return -1;
} else if (!(leafReader1 instanceof SegmentReader) && leafReader2 instanceof SegmentReader) {
log.info("Found non-SegmentReader of type {} in the DirectoryReader", leafReader1.getClass().getName());
Expand Down Expand Up @@ -177,20 +177,20 @@
.doOnTerminate(sharedSegmentReaderScheduler::dispose);
}

Flux<RfsLuceneDocument> readDocsFromSegment(LeafReaderContext leafReaderContext, int docCommitId, Scheduler scheduler,
Flux<RfsLuceneDocument> readDocsFromSegment(LeafReaderContext leafReaderContext, int docStartingId, Scheduler scheduler,
int concurrency) {
var segmentReader = leafReaderContext.reader();
var liveDocs = segmentReader.getLiveDocs();

int segmentDocBase = leafReaderContext.docBase;

log.atInfo().setMessage("For segment: {}, working on docCommitId: {}")
log.atInfo().setMessage("For segment: {}, working on docStartingId: {}")
.addArgument(leafReaderContext)
.addArgument(docCommitId)
.addArgument(docStartingId)
.log();

return Flux.range(0, segmentReader.maxDoc())
.skipWhile(id -> id + segmentDocBase <= docCommitId && docCommitId != 0)
.skipWhile(docNum -> segmentDocBase + docNum < docStartingId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So between this & the concatMap above that calls this, we'll race through every the #. of docs for every prior segment? If I have thousands or millions of segments, even if I'm just skip counting, this seems strange. I'm less worried about performance and more thinking of debugging, why not just snap to the starting spot rather than having a flux do the counting?

.flatMapSequentialDelayError(docIdx -> Mono.defer(() -> {
try {
if (liveDocs == null || liveDocs.get(docIdx)) {
Expand Down Expand Up @@ -218,7 +218,7 @@
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) {
Document document;
try {
document = reader.document(luceneDocId);

Check failure on line 221 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1874

Remove this use of "document"; it is deprecated.
} catch (IOException e) {
log.atError().setCause(e).setMessage("Failed to read document at Lucene index location {}")
.addArgument(luceneDocId).log();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.opensearch.migrations.bulkload.common;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* This class represents a document at the Lucene level within RFS. It tracks where the document was within the Lucene
* index, as well as the document's embedded Elasticsearch/OpenSearch properties
*/
@RequiredArgsConstructor
@Getter
public class RfsLuceneDocument {
// The Lucene document number of the document
public final int luceneDocNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ static Stream<Arguments> provideSnapshots() {

@ParameterizedTest
@MethodSource("provideSnapshots")
public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version version) throws Exception {
public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version version) {
final var repo = new FileSystemRepo(snapshot.dir);
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(version, repo);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(repo);
Expand All @@ -100,8 +100,7 @@ public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version ve
// Use the LuceneDocumentsReader to get the documents
var reader = LuceneDocumentsReader.getFactory(sourceResourceProvider).apply(luceneDir);

Flux<RfsLuceneDocument> documents = reader.readDocuments()
.sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave
Flux<RfsLuceneDocument> documents = reader.readDocuments();

// Verify that the results are as expected
StepVerifier.create(documents).expectNextMatches(doc -> {
Expand All @@ -116,25 +115,25 @@ public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version ve
assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource);
return true;
}).expectNextMatches(doc -> {
String expectedId = "unchangeddoc";
String expectedId = "updateddoc";
String actualId = doc.id;

String expectedType = null;
String actualType = doc.type;

String expectedSource = "{\"title\":\"This doc will not be changed\\nIt has multiple lines of text\\nIts source doc has extra newlines.\",\"content\":\"bluh bluh\"}";
String expectedSource = "{\"title\":\"This is doc that will be updated\",\"content\":\"Updated!\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType,
expectedSource, actualSource);
return true;
}).expectNextMatches(doc -> {
String expectedId = "updateddoc";
String expectedId = "unchangeddoc";
String actualId = doc.id;

String expectedType = null;
String actualType = doc.type;

String expectedSource = "{\"title\":\"This is doc that will be updated\",\"content\":\"Updated!\"}";
String expectedSource = "{\"title\":\"This doc will not be changed\\nIt has multiple lines of text\\nIts source doc has extra newlines.\",\"content\":\"bluh bluh\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType,
expectedSource, actualSource);
Expand All @@ -143,7 +142,7 @@ public void ReadDocuments_AsExpected(TestResources.Snapshot snapshot, Version ve
}

@Test
public void ReadDocuments_ES5_Origin_AsExpected() throws Exception {
public void ReadDocuments_ES5_Origin_AsExpected() {
TestResources.Snapshot snapshot = TestResources.SNAPSHOT_ES_6_8_MERGED;
Version version = Version.fromString("ES 6.8");

Expand All @@ -164,41 +163,40 @@ public void ReadDocuments_ES5_Origin_AsExpected() throws Exception {
// Use the LuceneDocumentsReader to get the documents
var reader = LuceneDocumentsReader.getFactory(sourceResourceProvider).apply(luceneDir);

Flux<RfsLuceneDocument> documents = reader.readDocuments()
.sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave
Flux<RfsLuceneDocument> documents = reader.readDocuments();

// Verify that the results are as expected
StepVerifier.create(documents).expectNextMatches(doc -> {
String expectedId = "complexdoc";
String expectedId = "unchangeddoc";
String actualId = doc.id;

String expectedType = "type1";
String actualType = doc.type;
String expectedType = "type2";
String actualType = doc.type;

String expectedSource = "{\"title\":\"This is a doc with complex history. Updated!\"}";
String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType,
expectedSource, actualSource);
assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource);
return true;
}).expectNextMatches(doc -> {
String expectedId = "unchangeddoc";
String expectedId = "updateddoc";
String actualId = doc.id;

String expectedType = "type2";
String actualType = doc.type;
String expectedType = "type2";
String actualType = doc.type;

String expectedSource = "{\"content\":\"This doc will not be changed\nIt has multiple lines of text\nIts source doc has extra newlines.\"}";
String expectedSource = "{\"content\":\"Updated!\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType, expectedSource, actualSource);
assertDocsEqual(expectedId, actualId, expectedType, actualType,
expectedSource, actualSource);
return true;
}).expectNextMatches(doc -> {
String expectedId = "updateddoc";
String expectedId = "complexdoc";
String actualId = doc.id;

String expectedType = "type2";
String expectedType = "type1";
String actualType = doc.type;

String expectedSource = "{\"content\":\"Updated!\"}";
String expectedSource = "{\"title\":\"This is a doc with complex history. Updated!\"}";
String actualSource = doc.source;
assertDocsEqual(expectedId, actualId, expectedType, actualType,
Comment on lines +170 to 201
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just verifying that these orders changed because we are now sorting segments & docs?

Copy link
Member

@AndreKurait AndreKurait Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we're not necessarily sorting the docs, but keeping a consistent ordering of them

expectedSource, actualSource);
Expand Down Expand Up @@ -278,15 +276,15 @@ protected DirectoryReader getReader() {
}

@Test
public void ReadDocumentsStartingFromCheckpointForOneSegments_AsExpected() throws Exception {
public void ReadDocumentsStartingFromCheckpointForOneSegments_AsExpected() {
// This snapshot has 6 documents in 1 segment. There are updates and deletes involved, so
// there are only 3 final documents, which affects which document id the reader should
// start at.
var snapshot = TestResources.SNAPSHOT_ES_7_10_W_SOFT;
var version = Version.fromString("ES 7.10");
List<List<String>> documentIds = List.of(
List.of("complexdoc", "unchangeddoc", "updateddoc"),
List.of("unchangeddoc", "updateddoc"),
List.of("complexdoc", "updateddoc", "unchangeddoc"),
List.of("updateddoc", "unchangeddoc"),
List.of("unchangeddoc"));
List<Integer> documentStartingIndices = List.of(0, 2, 5);

Expand All @@ -309,8 +307,7 @@ public void ReadDocumentsStartingFromCheckpointForOneSegments_AsExpected() throw


for (int i = 0; i < documentStartingIndices.size(); i++) {
Flux<RfsLuceneDocument> documents = reader.readDocuments(documentStartingIndices.get(i))
.sort(Comparator.comparing(doc -> doc.id)); // Sort for consistent order given LuceneDocumentsReader may interleave
Flux<RfsLuceneDocument> documents = reader.readDocuments(documentStartingIndices.get(i));

var actualDocIds = documents.collectList().block().stream().map(doc -> doc.id).collect(Collectors.joining(","));
var expectedDocIds = String.join(",", documentIds.get(i));
Expand All @@ -324,8 +321,8 @@ public void ReadDocumentsStartingFromCheckpointForManySegments_AsExpected() thro
var snapshot = TestResources.SNAPSHOT_ES_6_8;
var version = Version.fromString("ES 6.8");
List<List<String>> documentIds = List.of(
List.of("complexdoc", "unchangeddoc", "updateddoc"),
List.of("unchangeddoc", "updateddoc"),
List.of("complexdoc", "updateddoc", "unchangeddoc"),
List.of("updateddoc", "unchangeddoc"),
List.of("unchangeddoc"));

final var repo = new FileSystemRepo(snapshot.dir);
Expand All @@ -346,11 +343,11 @@ public void ReadDocumentsStartingFromCheckpointForManySegments_AsExpected() thro
var reader = LuceneDocumentsReader.getFactory(sourceResourceProvider).apply(luceneDir);


for (int i = 0; i < documentIds.size(); i++) {
Flux<RfsLuceneDocument> documents = reader.readDocuments(0);
for (int startingDocIndex = 0; startingDocIndex < documentIds.size(); startingDocIndex++) {
Flux<RfsLuceneDocument> documents = reader.readDocuments(startingDocIndex);

var actualDocIds = documents.collectList().block().stream().map(doc -> doc.id).collect(Collectors.joining(","));
var expectedDocIds = String.join(",", documentIds.get(i));
var expectedDocIds = String.join(",", documentIds.get(startingDocIndex));
Assertions.assertEquals(expectedDocIds, actualDocIds);
}
}
Expand Down
Loading