Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding subshard work items on lease expiry #1160

Merged
merged 21 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
55a83ca
Checkpoint - code in place for subshard work items, need to test
chelma Nov 22, 2024
3429a5a
Improved cursor plumbing for RFS SubShard work items
chelma Nov 25, 2024
2aae632
Additional changes per PR comments
chelma Nov 25, 2024
2b33a84
Merge remote-tracking branch 'upstream/main' into MIGRATIONS-2128
AndreKurait Nov 25, 2024
2c2a708
Modify LuceneDocumentsReader to read docs/segments sequentially
AndreKurait Nov 25, 2024
56839cd
Refactor of partial shard work items - added sequential doc reading, …
AndreKurait Dec 2, 2024
cf6ed86
Fix spotless issues
AndreKurait Dec 2, 2024
920be77
Working subshard
AndreKurait Dec 3, 2024
d8c4372
Rename numAttempts to leaseAcquisitionExponent and add max exponent b…
AndreKurait Dec 4, 2024
40eca92
Add worker cancellation on lease expiration
AndreKurait Dec 4, 2024
6211c33
Fix lucene starting doc id
AndreKurait Dec 4, 2024
e403228
Add lease duration decrease if shard setup is < 2.5% of lease time
AndreKurait Dec 4, 2024
e9ce08e
Fix WorkCoordinatorTest.java
AndreKurait Dec 4, 2024
5d82fbe
Add LeaseExpirationTest
AndreKurait Dec 5, 2024
e4be465
Fix scheduler dispose
AndreKurait Dec 5, 2024
b5640f5
Merge branch 'main' into MIGRATIONS-2128
AndreKurait Dec 5, 2024
8494eec
Address spotless
AndreKurait Dec 5, 2024
9820fa1
Address comments for LeaseExpirationTest
AndreKurait Dec 5, 2024
2d3ed9c
Update messaging on deletedDocs
AndreKurait Dec 5, 2024
c4dcbc4
Update RFS Design doc with successor work items
AndreKurait Dec 5, 2024
178fe55
Fix WorkCoordinatorTest
AndreKurait Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
log.atTrace().setMessage("Created doc for index {}: {}")
.addArgument(indexName)
.addArgument(doc::toString).log();
return new BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), indexName, null, doc.toString());
var docId = docIdCounter.incrementAndGet();
return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null);
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor;
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
Expand All @@ -20,12 +23,14 @@
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.models.ShardMetadata;
import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts;
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
Expand All @@ -45,6 +50,7 @@
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

Expand Down Expand Up @@ -176,7 +182,7 @@
public String getTransformerConfigParameterArgPrefix() {
return DOC_CONFIG_PARAMETER_ARG_PREFIX;
}
final static String DOC_CONFIG_PARAMETER_ARG_PREFIX = "doc-";

Check failure on line 185 in DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1124

Reorder the modifiers to comply with the Java Language Specification.

@Parameter(
required = false,
Expand Down Expand Up @@ -270,11 +276,19 @@
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
AtomicReference<IndexAndShardCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId)
workerId);
var processManager = new LeaseExpireTrigger(
w -> exitOnLeaseTimeout(
workCoordinator,
w,
progressCursor,
context.getWorkCoordinationContext()::createSuccessorWorkItemsContext),
Clock.systemUTC()
);
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
Expand Down Expand Up @@ -307,6 +321,7 @@
run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
reindexer,
progressCursor,
workCoordinator,
arguments.initialLeaseDuration,
processManager,
Expand All @@ -326,8 +341,31 @@
}
}

private static void exitOnLeaseTimeout(String workItemId) {
@SneakyThrows
private static void exitOnLeaseTimeout(
IWorkCoordinator coordinator,
String workItemId,
AtomicReference<IndexAndShardCursor> progressCursorRef,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
var progressCursor = progressCursorRef.get();
if (progressCursor != null) {
chelma marked this conversation as resolved.
Show resolved Hide resolved
log.error("Progress cursor: " + progressCursor.toString());
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
var successorWorkItem = progressCursor.toWorkItemString();
ArrayList<String> successorWorkItemIds = new ArrayList<>();
successorWorkItemIds.add(successorWorkItem);

coordinator.createSuccessorWorkItemsAndMarkComplete(
workItemId,
successorWorkItemIds,
contextSupplier
);
chelma marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.error("No progress cursor to create successor work items from.");
log.error("Skipping creation of successor work item to retry the existing one");
}

System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

Expand All @@ -346,6 +384,7 @@

public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
AtomicReference<IndexAndShardCursor> progressCursor,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
Expand All @@ -370,14 +409,20 @@
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
var runner = new DocumentsRunner(scopedWorkCoordinator, maxInitialLeaseDuration, (name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
}, unpackerFactory, readerFactory, reindexer);
var runner = new DocumentsRunner(scopedWorkCoordinator,
maxInitialLeaseDuration,
reindexer,
unpackerFactory,
(name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
},
readerFactory,
progressCursor::set);
chelma marked this conversation as resolved.
Show resolved Hide resolved
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
return runner.migrateNextShard(rootDocumentContext::createReindexContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ protected DirectoryReader getReader() {
}

@Override
protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean isLive) {
protected RfsLuceneDocument getDocument(IndexReader reader, int luceneSegIndex, int luceneDocId, boolean isLive) {
ingestedDocuments.incrementAndGet();
return super.getDocument(reader, docId, isLive);
return super.getDocument(reader, luceneSegIndex, luceneDocId, isLive);
}
};

Expand Down Expand Up @@ -107,7 +107,7 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int docId, boolean i

// Start reindexing in a separate thread
Thread reindexThread = new Thread(() -> {
reindexer.reindex("test-index", reader.readDocuments(), mockContext).block();
reindexer.reindex("test-index", 0, reader.readDocuments(), mockContext).then().block();
});
reindexThread.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;

Expand All @@ -33,6 +34,7 @@
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.transform.TransformationLoader;
Expand Down Expand Up @@ -191,6 +193,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(

var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);

AtomicReference<IndexAndShardCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
Expand All @@ -207,6 +210,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
.compressionEnabled(compressionEnabled)
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer),
progressCursor,
new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;


/**
* BulkDocSection represents a single document in a bulk request. It tracks the shape of the document
* as needed for reindexing, as well as the metadata needed for the bulk request.
Comment on lines +28 to +29
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation. A rename still seems like it could be useful (BulkDocItem?) since the javadoc doesn't go along w/ every usage of the class

*/
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
@Slf4j
public class BulkDocSection {
Expand All @@ -34,20 +39,16 @@

@EqualsAndHashCode.Include
@Getter
private final String docId;
private final String id;
private final BulkIndex bulkIndex;

public BulkDocSection(String id, String indexName, String type, String docBody) {
this(id, indexName, type, docBody, null);
}

public BulkDocSection(String id, String indexName, String type, String docBody, String routing) {
this.docId = id;
this.id = id;
this.bulkIndex = new BulkIndex(new BulkIndex.Metadata(id, type, indexName, routing), parseSource(docBody));
}

private BulkDocSection(BulkIndex bulkIndex) {
this.docId = bulkIndex.metadata.id;
this.id = bulkIndex.metadata.id;
this.bulkIndex = bulkIndex;
}

Expand Down Expand Up @@ -102,9 +103,12 @@

@SuppressWarnings("unchecked")
public Map<String, Object> toMap() {
return (Map<String, Object>) OBJECT_MAPPER.convertValue(bulkIndex, Map.class);

Check failure on line 106 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/BulkDocSection.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1905

Remove this unnecessary cast to "Map".
}

/**
* BulkIndex represents the serialization format of a single document in a bulk request.
*/
@NoArgsConstructor(force = true) // For Jackson
@AllArgsConstructor
@ToString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package org.opensearch.migrations.bulkload.common;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.opensearch.migrations.bulkload.worker.IndexAndShardCursor;
import org.opensearch.migrations.reindexer.tracing.IDocumentMigrationContexts.IDocumentReindexContext;
import org.opensearch.migrations.transform.IJsonTransformer;

Expand All @@ -26,19 +27,17 @@
private final int maxConcurrentWorkItems;
private final IJsonTransformer transformer;

public Mono<Void> reindex(String indexName, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
public Flux<IndexAndShardCursor> reindex(String indexName, int shardNumber, Flux<RfsLuceneDocument> documentStream, IDocumentReindexContext context) {
var scheduler = Schedulers.newParallel("DocumentBulkAggregator");
var bulkDocs = documentStream
var rfsDocs = documentStream
.publishOn(scheduler, 1)
.map(doc -> transformDocument(doc,indexName));
.map(doc -> transformDocument(doc, indexName, shardNumber));

return this.reindexDocsInParallelBatches(bulkDocs, indexName, context)
.doOnSuccess(unused -> log.debug("All batches processed"))
.doOnError(e -> log.error("Error prevented all batches from being processed", e))
return this.reindexDocsInParallelBatches(rfsDocs, indexName, shardNumber, context)
.doOnTerminate(scheduler::dispose);
}

Mono<Void> reindexDocsInParallelBatches(Flux<BulkDocSection> docs, String indexName, IDocumentReindexContext context) {
Flux<IndexAndShardCursor> reindexDocsInParallelBatches(Flux<RfsDocument> docs, String indexName, int shardNumber, IDocumentReindexContext context) {

Check failure on line 40 in RFS/src/main/java/org/opensearch/migrations/bulkload/common/DocumentReindexer.java

View workflow job for this annotation

GitHub Actions / Run SonarQube Analysis

java:S1172

Remove this unused method parameter "shardNumber".
// Use parallel scheduler for send subscription due on non-blocking io client
var scheduler = Schedulers.newParallel("DocumentBatchReindexer");
var bulkDocsBatches = batchDocsBySizeOrCount(docs);
Expand All @@ -47,24 +46,33 @@
return bulkDocsBatches
.limitRate(bulkDocsToBuffer, 1) // Bulk Doc Buffer, Keep Full
.publishOn(scheduler, 1) // Switch scheduler
.flatMap(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
.flatMapSequential(docsGroup -> sendBulkRequest(UUID.randomUUID(), docsGroup, indexName, context, scheduler),
maxConcurrentWorkItems)
.doOnTerminate(scheduler::dispose)
.then();
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
.doOnTerminate(scheduler::dispose);
}

@SneakyThrows
BulkDocSection transformDocument(RfsLuceneDocument doc, String indexName) {
var original = new BulkDocSection(doc.id, indexName, doc.type, doc.source, doc.routing);
RfsDocument transformDocument(RfsLuceneDocument doc, String indexName, int shardNumber) {
var finalDocument = RfsDocument.fromLuceneDocument(doc, indexName, shardNumber);
if (transformer != null) {
final Map<String,Object> transformedDoc = transformer.transformJson(original.toMap());
return BulkDocSection.fromMap(transformedDoc);
finalDocument = RfsDocument.transform(transformer::transformJson, finalDocument);
}
return BulkDocSection.fromMap(original.toMap());
return finalDocument;
}

Mono<Void> sendBulkRequest(UUID batchId, List<BulkDocSection> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
return client.sendBulkRequest(indexName, docsBatch, context.createBulkRequest()) // Send the request
/*
* TODO: Update the reindexing code to rely on _index field embedded in each doc section rather than requiring it in the
* REST path. See: https://opensearch.atlassian.net/browse/MIGRATIONS-2232
*/
Mono<IndexAndShardCursor> sendBulkRequest(UUID batchId, List<RfsDocument> docsBatch, String indexName, IDocumentReindexContext context, Scheduler scheduler) {
var lastDoc = docsBatch.get(docsBatch.size() - 1);
log.atInfo().setMessage("Last doc is: Index " + lastDoc.indexName + "Shard " + lastDoc.shardNumber + " Seg Id " + lastDoc.luceneSegId + " Lucene ID " + lastDoc.luceneDocId).log();

List<BulkDocSection> bulkDocSections = docsBatch.stream()
.map(rfsDocument -> rfsDocument.document)
.collect(Collectors.toList());

return client.sendBulkRequest(indexName, bulkDocSections, context.createBulkRequest()) // Send the request
.doFirst(() -> log.atInfo().setMessage("Batch Id:{}, {} documents in current bulk request.")
.addArgument(batchId)
.addArgument(docsBatch::size)
Expand All @@ -76,19 +84,19 @@
.log())
// Prevent the error from stopping the entire stream, retries occurring within sendBulkRequest
.onErrorResume(e -> Mono.empty())
.then() // Discard the response object
.subscribeOn(scheduler);
.then(Mono.just(new IndexAndShardCursor(indexName, lastDoc.shardNumber, lastDoc.luceneSegId, lastDoc.luceneDocId))
.subscribeOn(scheduler));
}

Flux<List<BulkDocSection>> batchDocsBySizeOrCount(Flux<BulkDocSection> docs) {
Flux<List<RfsDocument>> batchDocsBySizeOrCount(Flux<RfsDocument> docs) {
return docs.bufferUntil(new Predicate<>() {
private int currentItemCount = 0;
private long currentSize = 0;

@Override
public boolean test(BulkDocSection next) {
public boolean test(RfsDocument next) {
// Add one for newline between bulk sections
var nextSize = next.getSerializedLength() + 1L;
var nextSize = next.document.getSerializedLength() + 1L;
currentSize += nextSize;
currentItemCount++;

Expand Down
Loading
Loading