Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding subshard work items on lease expiry #1160

Merged
merged 21 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
55a83ca
Checkpoint - code in place for subshard work items, need to test
chelma Nov 22, 2024
3429a5a
Improved cursor plumbing for RFS SubShard work items
chelma Nov 25, 2024
2aae632
Additional changes per PR comments
chelma Nov 25, 2024
2b33a84
Merge remote-tracking branch 'upstream/main' into MIGRATIONS-2128
AndreKurait Nov 25, 2024
2c2a708
Modify LuceneDocumentsReader to read docs/segments sequentially
AndreKurait Nov 25, 2024
56839cd
Refactor of partial shard work items - added sequential doc reading, …
AndreKurait Dec 2, 2024
cf6ed86
Fix spotless issues
AndreKurait Dec 2, 2024
920be77
Working subshard
AndreKurait Dec 3, 2024
d8c4372
Rename numAttempts to leaseAcquisitionExponent and add max exponent b…
AndreKurait Dec 4, 2024
40eca92
Add worker cancellation on lease expiration
AndreKurait Dec 4, 2024
6211c33
Fix lucene starting doc id
AndreKurait Dec 4, 2024
e403228
Add lease duration decrease if shard setup is < 2.5% of lease time
AndreKurait Dec 4, 2024
e9ce08e
Fix WorkCoordinatorTest.java
AndreKurait Dec 4, 2024
5d82fbe
Add LeaseExpirationTest
AndreKurait Dec 5, 2024
e4be465
Fix scheduler dispose
AndreKurait Dec 5, 2024
b5640f5
Merge branch 'main' into MIGRATIONS-2128
AndreKurait Dec 5, 2024
8494eec
Address spotless
AndreKurait Dec 5, 2024
9820fa1
Address comments for LeaseExpirationTest
AndreKurait Dec 5, 2024
2d3ed9c
Update messaging on deletedDocs
AndreKurait Dec 5, 2024
c4dcbc4
Update RFS Design doc with successor work items
AndreKurait Dec 5, 2024
178fe55
Fix WorkCoordinatorTest
AndreKurait Dec 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ private List<CompletableFuture<?>> generateDocs(String indexName, Workload workl
log.atTrace().setMessage("Created doc for index {}: {}")
.addArgument(indexName)
.addArgument(doc::toString).log();
return new BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), indexName, null, doc.toString());
var docId = docIdCounter.incrementAndGet();
return new BulkDocSection(indexName + "_" + docId, indexName, null, doc.toString(), null);
})
.collect(Collectors.toList());

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package org.opensearch.migrations;

import java.time.Duration;
import java.time.Instant;

import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;


class RfsMigrateDocumentsTest {


private static class TestClass extends RfsMigrateDocuments {
public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration,
Instant leaseExpirationTime) {
return RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);
}
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

// Lease at 40 minutes, shard prep 59 seconds, successor lease should be decreased since shard prep is < 2.5%
// and exponent is > 0
var existingLeaseExponent = 2;
var shardPrepTime = Duration.ofSeconds(59);
Duration initialLeaseDuration = Duration.ofMinutes(10);
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent - 1, successorNextAcquisitionLeaseExponent, "Should decrease successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThresholdWith0Exponent() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(1);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(0, successorNextAcquisitionLeaseExponent, "Should return 0 for successorExponent");
}


@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(59);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is less than 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_EqualToUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(60);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent when shard prep time is equal to 10% of lease duration");
}

@Test
public void testGetSuccessorNextAcquisitionLeaseExponent_ExceedsUpperThreshold() {
WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider();

var shardPrepTime = Duration.ofSeconds(61);
var existingLeaseExponent = 0;
var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent);

workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH);
workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime));
Duration initialLeaseDuration = Duration.ofMinutes(10);
Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple));

int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime);

Assertions.assertEquals(existingLeaseExponent + 1, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is greater than to 10% of lease duration");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.data.WorkloadGenerator;
import org.opensearch.migrations.data.WorkloadOptions;
import org.opensearch.migrations.data.workloads.Workloads;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;
import org.opensearch.migrations.testutils.ToxiProxyWrapper;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Network;

/**
* TODO - the code in this test was lifted from ProcessLifecycleTest.java
* Some of the functionality and code are shared between the two and should be refactored.
*/
@Slf4j
@Tag("isolatedTest")
public class LeaseExpirationTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
ToxiProxyWrapper proxyContainer;
}

@Test
public void testProcessExitsAsExpected() {
// 2 Shards, for each shard, expect three status code 2 and one status code 0
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
int shards = 2;
int migrationProcessesPerShard = 4;
int continueExitCode = 2;
int finalExitCodePerShard = 0;
runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards,
finalExitCodePerShard, shards,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer
));
}

@SneakyThrows
private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCodes,
int eventualExitCode, int eventualExitCodeCount,
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

try (
var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2)
.withAccessToHost(true);
var network = Network.newNetwork();
var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2)
.withAccessToHost(true)
.withNetwork(network)
.withNetworkAliases(TARGET_DOCKER_HOSTNAME);
var proxyContainer = new ToxiProxyWrapper(network)
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start)
).join();

proxyContainer.start("target", 9200);

// Populate the source cluster with data
var client = new OpenSearchClient(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);
var generator = new WorkloadGenerator(client);
var workloadOptions = new WorkloadOptions();

var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl());

var shards = 2;
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
// Number of default shards is different across different versions on ES/OS.
// So we explicitly set it.
String body = String.format(
"{" +
" \"settings\": {" +
" \"index\": {" +
" \"number_of_shards\": %d," +
" \"number_of_replicas\": 0" +
" }" +
" }" +
"}",
shards
);
sourceClusterOperations.createIndex("geonames", body);


// Sending 5 docs per request with 4 requests concurrently with each taking 0.125 second is 160 docs/sec
// will process 9760 docs in 61 seconds. With 20s lease duration, expect to be finished in 4 leases
var docsPerShard = 9760;
workloadOptions.totalDocs = shards * docsPerShard;
workloadOptions.workloads = List.of(Workloads.GEONAMES);
workloadOptions.maxBulkBatchSize = 1000;
generator.generate(workloadOptions);

// Create the snapshot from the source cluster
var args = new CreateSnapshot.Args();
args.snapshotName = SNAPSHOT_NAME;
args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR;
args.sourceArgs.host = esSourceContainer.getUrl();

var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext());
snapshotCreator.run();

esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int exitCode;
int initialExitCodeCount = 0;
int finalExitCodeCount = 0;
int runs = 0;
do {
exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
runs++;
if (exitCode == initialExitCode) {
initialExitCodeCount++;
}
if (exitCode == eventualExitCode) {
finalExitCodeCount++;
}
log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log();
// Clean tree for subsequent run
deleteTree(tempDirLucene);
} while (finalExitCodeCount < eventualExitCodeCount && runs < initialExitCodes * 2);

// Assert doc count on the target cluster matches source
checkClusterMigrationOnFinished(esSourceContainer, osTargetContainer,
DocumentMigrationTestContext.factory().noOtelTracking());

// Check if the final exit code is as expected
Assertions.assertEquals(
finalExitCodeCount,
eventualExitCodeCount,
"The program did not exit with the expected final exit code."
);

Assertions.assertEquals(
eventualExitCode,
exitCode,
"The program did not exit with the expected final exit code."
);

Assertions.assertEquals(
initialExitCodes,
initialExitCodeCount,
"The program did not exit with the expected number of " + initialExitCode +" exit codes"
);
} finally {
deleteTree(tempDirSnapshot);
}
}

@SneakyThrows
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer
)
{
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
var latency = tp.toxics().latency("latency-toxic", ToxicDirection.UPSTREAM, 125);

// Set to less than 2x lease time to ensure leases aren't doubling
int timeoutSeconds = 30;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

latency.remove();

return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
tempDirSnapshot.toString(),
"--lucene-dir",
tempDirLucene.toString(),
"--target-host",
targetAddress,
"--index-allowlist",
"geonames",
"--documents-per-bulk-request",
"5",
"--max-connections",
"4",
"--source-version",
"ES_7_10",
"--initial-lease-duration",
"PT20s" };

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
Dismissed Show dismissed Hide dismissed
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}

}
Loading
Loading