Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding subshard work items on lease expiry #1160

Merged
merged 21 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
55a83ca
Checkpoint - code in place for subshard work items, need to test
chelma Nov 22, 2024
3429a5a
Improved cursor plumbing for RFS SubShard work items
chelma Nov 25, 2024
2aae632
Additional changes per PR comments
chelma Nov 25, 2024
2b33a84
Merge remote-tracking branch 'upstream/main' into MIGRATIONS-2128
AndreKurait Nov 25, 2024
2c2a708
Modify LuceneDocumentsReader to read docs/segments sequentially
AndreKurait Nov 25, 2024
56839cd
Refactor of partial shard work items - added sequential doc reading, …
AndreKurait Dec 2, 2024
cf6ed86
Fix spotless issues
AndreKurait Dec 2, 2024
920be77
Working subshard
AndreKurait Dec 3, 2024
d8c4372
Rename numAttempts to leaseAcquisitionExponent and add max exponent b…
AndreKurait Dec 4, 2024
40eca92
Add worker cancellation on lease expiration
AndreKurait Dec 4, 2024
6211c33
Fix lucene starting doc id
AndreKurait Dec 4, 2024
e403228
Add lease duration decrease if shard setup is < 2.5% of lease time
AndreKurait Dec 4, 2024
e9ce08e
Fix WorkCoordinatorTest.java
AndreKurait Dec 4, 2024
5d82fbe
Add LeaseExpirationTest
AndreKurait Dec 5, 2024
e4be465
Fix scheduler dispose
AndreKurait Dec 5, 2024
b5640f5
Merge branch 'main' into MIGRATIONS-2128
AndreKurait Dec 5, 2024
8494eec
Address spotless
AndreKurait Dec 5, 2024
9820fa1
Address comments for LeaseExpirationTest
AndreKurait Dec 5, 2024
2d3ed9c
Update messaging on deletedDocs
AndreKurait Dec 5, 2024
c4dcbc4
Update RFS Design doc with successor work items
AndreKurait Dec 5, 2024
178fe55
Fix WorkCoordinatorTest
AndreKurait Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
log.atTrace().setMessage("Created doc for index {}: {}")
.addArgument(indexName)
.addArgument(doc::toString).log();
return new BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), indexName, null, doc.toString());
var docId = docIdCounter.incrementAndGet();
return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null);
})
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@
import java.nio.file.Paths;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

import org.opensearch.migrations.bulkload.common.DefaultSourceRepoAccessor;
import org.opensearch.migrations.bulkload.common.DocumentReindexer;
Expand All @@ -20,13 +25,16 @@
import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.bulkload.models.IndexMetadata;
import org.opensearch.migrations.bulkload.models.ShardMetadata;
import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts;
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
Expand All @@ -45,6 +53,7 @@
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

Expand All @@ -55,6 +64,9 @@ public class RfsMigrateDocuments {
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

// Increase successor nextAcquisitionLeaseExponent if shard setup takes more than 10% of lease total time
private static final double SHARD_SETUP_LEASE_DURATION_THRESHOLD = 0.1;

public static final String DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG = "[" +
" {" +
" \"JsonTransformerForDocumentTypeRemovalProvider\":\"\"" +
Expand Down Expand Up @@ -270,11 +282,28 @@ public static void main(String[] args) throws Exception {
}
IJsonTransformer docTransformer = new TransformationLoader().getTransformerFactoryLoader(docTransformerConfig);

try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
var workItemRef = new AtomicReference<IWorkCoordinator.WorkItemAndDuration>();
var progressCursor = new AtomicReference<WorkItemCursor>();
var cancellationRunnableRef = new AtomicReference<Runnable>();
var workItemTimeProvider = new WorkItemTimeProvider();
try (var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId)
workerId,
Clock.systemUTC(),
workItemRef::set);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a smell here. The coordinator code returns a value and it also runs hook with the same value that it's returning. You need to monitor the progress cursor too and you push that responsibility onto the next layer down. Combining this and the progress monitoring and extracting it from the coordinator both seem like they'd make the code more maintainable and easier to follow.

What really matters is what the DocumentRunner is using, not what the coordinator returned. This is plumbing some brittle assumptions through the codebase that work today but add debt.

var processManager = new LeaseExpireTrigger(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty unwieldy. Most of these values should probably be fields in the instance and exitOnLeaseTimeout should be leveraging a method to do the work continuation instead of a static. Most of the logic w/in that function is relevant for 2 other critical use cases too - shutdown from SIGTERM and unexpected exceptions (including OOM). Doing a refactor to make the inner cleanup method easier and more compact will make enabling those other use cases trivial.

I also cannot figure out why the processManager object doesn't get created within the run method - even if constructed via a supplier if there is some policy that we want to thread. It's all around hard to follow (sorry, I think that some of this is from my initial work to put this here in the first place)

w -> exitOnLeaseTimeout(
workItemRef,
workCoordinator,
w,
progressCursor,
workItemTimeProvider,
arguments.initialLeaseDuration,
() -> Optional.ofNullable(cancellationRunnableRef.get()).ifPresent(Runnable::run),
context.getWorkCoordinationContext()::createSuccessorWorkItemsContext),
Clock.systemUTC()
);
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
Expand Down Expand Up @@ -307,6 +336,7 @@ public static void main(String[] args) throws Exception {
run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
reindexer,
progressCursor,
workCoordinator,
arguments.initialLeaseDuration,
processManager,
Expand All @@ -316,7 +346,9 @@ public static void main(String[] args) throws Exception {
sourceResourceProvider.getShardMetadata(),
unpackerFactory,
arguments.maxShardSizeBytes,
context);
context,
cancellationRunnableRef,
workItemTimeProvider);
} catch (NoWorkLeftException e) {
log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log();
System.exit(NO_WORK_LEFT_EXIT_CODE);
Expand All @@ -326,11 +358,105 @@ public static void main(String[] args) throws Exception {
}
}

private static void exitOnLeaseTimeout(String workItemId) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
@SneakyThrows
private static void exitOnLeaseTimeout(
AtomicReference<IWorkCoordinator.WorkItemAndDuration> workItemRef,
IWorkCoordinator coordinator,
String workItemId,
AtomicReference<WorkItemCursor> progressCursorRef,
WorkItemTimeProvider workItemTimeProvider,
Duration initialLeaseDuration,
Runnable cancellationRunnable,
Supplier<IWorkCoordinationContexts.ICreateSuccessorWorkItemsContext> contextSupplier
) {
log.atWarn().setMessage("Terminating RfsMigrateDocuments because the lease has expired for {}")
.addArgument(workItemId)
.log();
if (progressCursorRef.get() != null) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love for this to be a field so that I could check what the semantics are. Can this go to null after is was not-null? What does that indicate? As it is, it's hard to know what the semantics should be since we're passing it as a param (the value could be any old atomic)

log.atWarn().setMessage("Progress cursor set, cancelling active doc migration if still running").log();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does 'if' mean here?

cancellationRunnable.run();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a guarantee on how long this should take to run, so this would be good to put into an async thread and to only wait for so many seconds (maybe 10% of the time that we've allocated to wrapping up?)

// Get a new progressCursor after cancellation for most up-to-date checkpoint
var progressCursor = progressCursorRef.get();
log.atWarn().setMessage("Progress cursor: {}")
.addArgument(progressCursor).log();
var workItemAndDuration = workItemRef.get();
if (workItemAndDuration == null) {
throw new IllegalStateException("Unexpected state with progressCursor set without a" +
"work item");
Comment on lines +386 to +387
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an assertion that we should be doing when we're setting the progress - or something that by virtue of our model is fundamentally impossible to express. The model for where/when we're hooking these updates is part of the issue (see the comment about the hook in the coordinator).

}
log.atWarn().setMessage("Work Item and Duration: {}").addArgument(workItemAndDuration)
.log();
log.atWarn().setMessage("Work Item: {}").addArgument(workItemAndDuration.getWorkItem())
.log();
var successorWorkItemIds = getSuccessorWorkItemIds(workItemAndDuration, progressCursor);
log.atWarn().setMessage("Successor Work Ids: {}").addArgument(String.join(", ", successorWorkItemIds))
.log();
var successorNextAcquisitionLeaseExponent = getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, workItemAndDuration.getLeaseExpirationTime());
coordinator.createSuccessorWorkItemsAndMarkComplete(
workItemId,
successorWorkItemIds,
successorNextAcquisitionLeaseExponent,
contextSupplier
);
} else {
log.atWarn().setMessage("No progress cursor to create successor work items from. This can happen when" +
"downloading and unpacking shard takes longer than the lease").log();
log.atWarn().setMessage("Skipping creation of successor work item to retry the existing one with more time")
.log();
}

System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

protected static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
Comment on lines +413 to +414
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, weird formatting

if (workItemTimeProvider.getLeaseAcquisitionTimeRef().get() == null ||
workItemTimeProvider.getDocumentMigraionStartTimeRef().get() == null) {
throw new IllegalStateException("Unexpected state with either leaseAquisitionTime or" +
"documentMigrationStartTime as null while creating successor work item");
}
var leaseAcquisitionTime = workItemTimeProvider.getLeaseAcquisitionTimeRef().get();
var documentMigrationStartTime = workItemTimeProvider.getDocumentMigraionStartTimeRef().get();
Comment on lines +420 to +421
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smell to acquire a value from an atomic above and to do a test, then to acquire it again and assume that the value wasn't volatile. You should capture these values once and use them in the test and for other evaluations.

var leaseDuration = Duration.between(leaseAcquisitionTime, leaseExpirationTime);
var leaseDurationFactor = (double) leaseDuration.toMillis() / initialLeaseDuration.toMillis();
// 2 ^ n = leaseDurationFactor <==> log2(leaseDurationFactor) = n, n >= 0
var existingNextAcquisitionLeaseExponent = Math.max(Math.round(Math.log(leaseDurationFactor) / Math.log(2)), 0);
var shardSetupDuration = Duration.between(leaseAcquisitionTime, documentMigrationStartTime);
var successorShardNextAcquisitionLeaseExponent = (int) (existingNextAcquisitionLeaseExponent + (((double) shardSetupDuration.toMillis() / leaseDuration.toMillis() > SHARD_SETUP_LEASE_DURATION_THRESHOLD) ? 1 : 0));

log.atDebug().setMessage("SuccessorNextAcquisitionLeaseExponent calculated values:" +
"\nleaseAcquisitionTime:{}" +
"\ndocumentMigrationStartTime:{}" +
"\nleaseDuration:{}" +
"\nleaseDurationFactor:{}" +
"\nexistingNextAcquisitionLeaseExponent:{}" +
"\nshardSetupDuration:{}" +
"\nsuccessorShardNextAcquisitionLeaseExponent:{}")
.addArgument(leaseAcquisitionTime)
.addArgument(documentMigrationStartTime)
.addArgument(leaseDuration)
.addArgument(leaseDurationFactor)
.addArgument(existingNextAcquisitionLeaseExponent)
.addArgument(shardSetupDuration)
.addArgument(successorShardNextAcquisitionLeaseExponent)
.log();

return successorShardNextAcquisitionLeaseExponent;
}

private static ArrayList<String> getSuccessorWorkItemIds(IWorkCoordinator.WorkItemAndDuration workItemAndDuration, WorkItemCursor progressCursor) {
if (workItemAndDuration == null) {
throw new IllegalStateException("Unexpected worker coordination state. Expected workItem set when progressCursor not null.");
}
var workItem = workItemAndDuration.getWorkItem();
var successorWorkItem = new IWorkCoordinator.WorkItemAndDuration
.WorkItem(workItem.getIndexName(), workItem.getShardNumber(),
progressCursor.getDocId() + 1);
ArrayList<String> successorWorkItemIds = new ArrayList<>();
successorWorkItemIds.add(successorWorkItem.toString());
return successorWorkItemIds;
}

private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
Expand All @@ -346,6 +472,7 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri

public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
AtomicReference<WorkItemCursor> progressCursor,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
Expand All @@ -355,7 +482,9 @@ public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocument
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes,
RootDocumentMigrationContext rootDocumentContext)
RootDocumentMigrationContext rootDocumentContext,
AtomicReference<Runnable> cancellationRunnable,
WorkItemTimeProvider timeProvider)
throws IOException, InterruptedException, NoWorkLeftException
{
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
Expand All @@ -370,14 +499,22 @@ public static DocumentsRunner.CompletionStatus run(Function<Path, LuceneDocument
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
var runner = new DocumentsRunner(scopedWorkCoordinator, maxInitialLeaseDuration, (name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
}, unpackerFactory, readerFactory, reindexer);
var runner = new DocumentsRunner(scopedWorkCoordinator,
maxInitialLeaseDuration,
reindexer,
unpackerFactory,
(name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
}
return shardMetadata;
},
readerFactory,
progressCursor::set,
cancellationRunnable::set,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smell. Why does the DocumentsRunner take a lambda in to specify its own cancellation. Can't the documents runner have its own cancellation method on it and we just invoke it directly when it's time?

timeProvider);
return runner.migrateNextShard(rootDocumentContext::createReindexContext);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.opensearch.migrations;

import java.time.Duration;
import java.time.Instant;

import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


class RfsMigrateDocumentsTest {


private static class TestClass extends RfsMigrateDocuments {
public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
return RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);
}
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(59);
var initialShardAttempts = 0;
var initialLeaseMultiple = (int) Math.pow(2, initialShardAttempts);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorAttempts = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(initialShardAttempts, successorAttempts, "Should return initialShardAttempts + 1 when shard prep time is less than 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_EqualToThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(60);
var initialShardAttempts = 0;
var initialLeaseMultiple = (int) Math.pow(2, initialShardAttempts);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorAttempts = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(initialShardAttempts, successorAttempts, "Should return initialShardAttempts when shard prep time is equal to 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_ExceedsThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(61);
var initialShardAttempts = 0;
var initialLeaseMultiple = (int) Math.pow(2, initialShardAttempts);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorAttempts = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(initialShardAttempts + 1, successorAttempts, "Should return initialShardAttempts + 1 when shard prep time is greater than to 10% of lease duration");
}
}
Loading
Loading