Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfixes in RFS work coordination update queries to claim leases #769

Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Clock;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -58,6 +60,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -112,10 +115,11 @@ public void test(String sourceImageName, String targetImageName, int numWorkers)

var workerFutures = new ArrayList<CompletableFuture<Void>>();
var runCounter = new AtomicInteger();
final var clockJitter = new Random(1);
for (int i = 0; i < numWorkers; ++i) {
workerFutures.add(CompletableFuture.supplyAsync(() ->
migrateDocumentsSequentially(sourceRepo, SNAPSHOT_NAME, INDEX_ALLOWLIST,
osTargetContainer.getHttpHostAddress(), runCounter)));
osTargetContainer.getHttpHostAddress(), runCounter, clockJitter)));
}
var thrownException = Assertions.assertThrows(ExecutionException.class, () ->
CompletableFuture.allOf(workerFutures.toArray(CompletableFuture[]::new)).get());
Expand Down Expand Up @@ -180,10 +184,12 @@ private Void migrateDocumentsSequentially(FileSystemRepo sourceRepo,
String snapshotName,
List<String> indexAllowlist,
String targetAddress,
AtomicInteger runCounter) {
AtomicInteger runCounter,
Random clockJitter) {
for (int runNumber=0; ; ++runNumber) {
try {
var workResult = migrateDocumentsWithOneWorker(sourceRepo, snapshotName, indexAllowlist, targetAddress);
var workResult = migrateDocumentsWithOneWorker(sourceRepo, snapshotName, indexAllowlist, targetAddress,
clockJitter);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return null;
} else {
Expand Down Expand Up @@ -234,7 +240,8 @@ static class LeasePastError extends Error { }
private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRepo sourceRepo,
String snapshotName,
List<String> indexAllowlist,
String targetAddress)
String targetAddress,
Random clockJitter)
throws RfsMigrateDocuments.NoWorkLeftException
{
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
Expand All @@ -257,14 +264,19 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(SourceRep
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
final int ms_window = 1000;
final var nextClockShift = (int)(clockJitter.nextDouble() * ms_window)-(ms_window/2);
log.info("nextClockShift="+nextClockShift);

return RfsMigrateDocuments.run(path -> new FilteredLuceneDocumentsReader(path, terminatingDocumentFilter),
new DocumentReindexer(new OpenSearchClient(targetAddress, null)),
new OpenSearchWorkCoordinator(
new ApacheHttpClient(new URI(targetAddress)),
// new ReactorHttpClient(new ConnectionDetails(osTargetContainer.getHttpHostAddress(),
// null, null)),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString()),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, UUID.randomUUID().toString(),
Clock.offset(Clock.systemUTC(),
Duration.ofMillis(nextClockShift))),
processManager,
indexMetadataFactory,
snapshotName,
Expand Down
58 changes: 45 additions & 13 deletions RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
@Slf4j
public class OpenSearchWorkCoordinator implements IWorkCoordinator {
public static final String INDEX_NAME = ".migrations_working_state";
public static final int MAX_RETRIES = 6; // at 100ms, the total delay will be 105s
public static final int MAX_REFRESH_RETRIES = 6;
public static final int MAX_SETUP_RETRIES = 6;
public static final int MAX_JITTER_RETRIES = 6;

public static final String PUT_METHOD = "PUT";
public static final String POST_METHOD = "POST";
Expand Down Expand Up @@ -104,7 +106,7 @@
"}\n";

try {
doUntil("setup-" + INDEX_NAME, 100, MAX_RETRIES,
doUntil("setup-" + INDEX_NAME, 100, MAX_SETUP_RETRIES,
() -> {
try {
return httpClient.makeJsonRequest(PUT_METHOD, INDEX_NAME, null, body);
Expand Down Expand Up @@ -384,6 +386,8 @@
" ctx._source." + EXPIRATION_FIELD_NAME + " = newExpiration;" +
" ctx._source." + LEASE_HOLDER_ID_FIELD_NAME + " = params.workerId;" +
" ctx._source.numAttempts += 1;" +
" } else {" +
" ctx.op = \\\"noop\\\";" +
" }" +
"\" " + // end of source script contents
"}" + // end of script block
Expand All @@ -400,15 +404,22 @@

var response = httpClient.makeJsonRequest(POST_METHOD, INDEX_NAME + "/_update_by_query?refresh=true&max_docs=1",
null, body);
if (response.getStatusCode() == 409) {
return UpdateResult.VERSION_CONFLICT;

Check warning on line 408 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L408

Added line #L408 was not covered by tests
Copy link
Member

@AndreKurait AndreKurait Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to check for resultTree['noop'] > 0 on the below elseIf [line 413]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - addressed in the latest commit (if I understood the concern)

}
var resultTree = objectMapper.readTree(response.getPayloadStream());
final var numUpdated = resultTree.path(UPDATED_COUNT_FIELD_NAME).longValue();
final var noops = resultTree.path("noops").longValue();

Check warning on line 412 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L412

Added line #L412 was not covered by tests
assert numUpdated <= 1;
if (numUpdated > 0) {
return UpdateResult.SUCCESSFUL_ACQUISITION;
} else if (resultTree.path(VERSION_CONFLICTS_FIELD_NAME).longValue() > 0) {
return UpdateResult.VERSION_CONFLICT;
} else if (resultTree.path("total").longValue() == 0) {
return UpdateResult.NOTHING_TO_ACQUIRE;
} else if (noops > 0) {
throw new PotentialClockDriftDetectedException("Found " + noops +

Check warning on line 421 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L421

Added line #L421 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VERSION_CONFLICT isn't being handled with an exception, what do you think about adding UpdateResult.CLOCK_DRIFT and log the number of noops found here?

This cleans up the logic around acquireNextWorkItem(...) to be more consistent.

" noop values in response with no successful updates");
} else {
throw new IllegalStateException("Unexpected response for update: " + resultTree);
}
Expand Down Expand Up @@ -460,6 +471,12 @@
Object transformedValue;
}

private static class PotentialClockDriftDetectedException extends IllegalStateException {
public PotentialClockDriftDetectedException(String s) {
super(s);
}

Check warning on line 477 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L476-L477

Added lines #L476 - L477 were not covered by tests
}

public static <T,U> U doUntil(String labelThatShouldBeAContext, long initialRetryDelayMs, int maxTries,
Supplier<T> supplier, Function<T,U> transformer, BiPredicate<T,U> test)
throws InterruptedException, MaxTriesExceededException
Expand All @@ -484,7 +501,7 @@

private void refresh() throws IOException, InterruptedException {
try {
doUntil("refresh", 100, MAX_RETRIES, () -> {
doUntil("refresh", 100, MAX_REFRESH_RETRIES, () -> {

Check warning on line 504 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L504

Added line #L504 was not covered by tests
try {
return httpClient.makeJsonRequest(GET_METHOD, INDEX_NAME + "/_refresh",null,null);
} catch (IOException e) {
Expand All @@ -499,17 +516,32 @@

public WorkAcquisitionOutcome acquireNextWorkItem(Duration leaseDuration) throws IOException, InterruptedException {
refresh();
int jitterRecoveryTimeMs = 10;
int tries = 0;

Check warning on line 520 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L519-L520

Added lines #L519 - L520 were not covered by tests
while (true) {
final var obtainResult = assignOneWorkItem(leaseDuration.toSeconds());
switch (obtainResult) {
case SUCCESSFUL_ACQUISITION:
return getAssignedWorkItem();
case NOTHING_TO_ACQUIRE:
return new NoAvailableWorkToBeDone();
case VERSION_CONFLICT:
continue;
default:
throw new IllegalStateException("unknown result from the assignOneWorkItem: " + obtainResult);
try {
final var obtainResult = assignOneWorkItem(leaseDuration.toSeconds());

Check warning on line 523 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L523

Added line #L523 was not covered by tests
switch (obtainResult) {
case SUCCESSFUL_ACQUISITION:
return getAssignedWorkItem();

Check warning on line 526 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L526

Added line #L526 was not covered by tests
case NOTHING_TO_ACQUIRE:
return new NoAvailableWorkToBeDone();

Check warning on line 528 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L528

Added line #L528 was not covered by tests
case VERSION_CONFLICT:
continue;

Check warning on line 530 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L530

Added line #L530 was not covered by tests
default:
throw new IllegalStateException("unknown result from the assignOneWorkItem: " + obtainResult);

Check warning on line 532 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L532

Added line #L532 was not covered by tests
}
} catch (PotentialClockDriftDetectedException e) {

Check warning on line 534 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L534

Added line #L534 was not covered by tests
if (++tries > MAX_JITTER_RETRIES) {
throw e;

Check warning on line 536 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L536

Added line #L536 was not covered by tests
}
int finalJitterRecoveryTimeMs = jitterRecoveryTimeMs;
log.atWarn().setMessage(()->"Couldn't complete work assignment. " +

Check warning on line 539 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L538-L539

Added lines #L538 - L539 were not covered by tests
"Presuming that the issue was due to clock synchronization. " +
"Backing off " + finalJitterRecoveryTimeMs +"ms and trying again.")
.setCause(e).log();
Thread.sleep(jitterRecoveryTimeMs);
jitterRecoveryTimeMs *= 2;

Check warning on line 544 in RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchWorkCoordinator.java#L542-L544

Added lines #L542 - L544 were not covered by tests
}
}
}
Expand Down
Loading