Skip to content

Commit

Permalink
Refactor lease expiration test functionality into SourceTestBase
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Dec 12, 2024
1 parent 77f0244 commit 10acac7
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 144 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -23,47 +21,31 @@
import org.opensearch.migrations.testutils.ToxiProxyWrapper;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.testcontainers.containers.Network;

/**
* TODO - the code in this test was lifted from ProcessLifecycleTest.java
* Some of the functionality and code are shared between the two and should be refactored.
*/
@Tag("longTest")
@Slf4j
public class LeaseExpirationTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
ToxiProxyWrapper proxyContainer;
}

@Tag("isolatedTest")
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testProcessExitsAsExpected(boolean forceMoreSegments) {
// Sending 5 docs per request with 4 requests concurrently with each taking 0.125 second is 160 docs/sec
// will process 9760 docs in 61 seconds. With 20s lease duration, expect to be finished in 4 leases.
// Sending 5 docs per request with 4 requests concurrently with each taking 0.250 second is 80 docs/sec
// will process 12880 docs in 61 seconds. With 40s lease duration, expect to be finished in 4 leases.
// This is ensured with the toxiproxy settings, the migration should not be able to be completed
// faster, but with a heavily loaded test environment, may be slower which is why this is marked as
// isolated.
// 2 Shards, for each shard, expect three status code 2 and one status code 0 (4 leases)
int shards = 2;
int indexDocCount = 9760 * shards;
int indexDocCount = 12880 * shards;
int migrationProcessesPerShard = 4;
int continueExitCode = 2;
int finalExitCodePerShard = 0;
Expand Down Expand Up @@ -196,15 +178,26 @@ private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer
)
{
) {
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
var latency = tp.toxics().latency("latency-toxic", ToxicDirection.UPSTREAM, 125);
var latency = tp.toxics().latency("latency-toxic", ToxicDirection.UPSTREAM, 250);

// Set to less than 2x lease time to ensure leases aren't doubling
int timeoutSeconds = 30;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress);
int timeoutSeconds = 60;

String[] additionalArgs = {
"--documents-per-bulk-request", "5",
"--max-connections", "4",
"--initial-lease-duration", "PT40s"
};

ProcessBuilder processBuilder = setupProcess(
tempDirSnapshot,
tempDirLucene,
targetAddress,
additionalArgs
);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
Expand All @@ -223,51 +216,4 @@ private static int runProcessAgainstToxicTarget(
return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
tempDirSnapshot.toString(),
"--lucene-dir",
tempDirLucene.toString(),
"--target-host",
targetAddress,
"--index-allowlist",
"geonames",
"--documents-per-bulk-request",
"5",
"--max-connections",
"4",
"--source-version",
"ES_7_10",
"--initial-lease-duration",
"PT20s" };

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.FileSystemRepo;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
Expand Down Expand Up @@ -81,13 +80,7 @@ public void testDocumentMigration(
generator.generate(new WorkloadOptions());

// Create the snapshot from the source cluster
var args = new CreateSnapshot.Args();
args.snapshotName = "test_snapshot";
args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR;
args.sourceArgs.host = esSourceContainer.getUrl();

var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext());
snapshotCreator.run();
createSnapshot(esSourceContainer, "test_snapshot", testSnapshotContext);

final List<String> INDEX_ALLOWLIST = List.of();
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
Expand All @@ -103,7 +96,7 @@ public void testDocumentMigration(
CompletableFuture.supplyAsync(
() -> migrateDocumentsSequentially(
sourceRepo,
args.snapshotName,
"test_snapshot",
INDEX_ALLOWLIST,
osTargetContainer.getUrl(),
runCounter,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -24,25 +21,18 @@
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.testcontainers.containers.Network;

/**
* TODO - the code in this test was lifted from FullTest.java (now named ParallelDocumentMigrationsTest.java).
* Some of the functionality and code are shared between the two and should be refactored.
*/
@Slf4j
@Tag("longTest")
public class ProcessLifecycleTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";
public static final List<String> INDEX_ALLOWLIST = List.of();
public static final int OPENSEARCH_PORT = 9200;

enum FailHow {
Expand Down Expand Up @@ -166,8 +156,8 @@ private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer,
FailHow failHow)
{
FailHow failHow
) {
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
if (failHow == FailHow.AT_STARTUP) {
Expand All @@ -177,7 +167,21 @@ private static int runProcessAgainstToxicTarget(
}

int timeoutSeconds = 90;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow);

String initialLeaseDuration = failHow == FailHow.NEVER ? "PT10M" : "PT1S";

String[] additionalArgs = {
"--documents-per-bulk-request", "10",
"--max-connections", "1",
"--initial-lease-duration", initialLeaseDuration
};

ProcessBuilder processBuilder = setupProcess(
tempDirSnapshot,
tempDirLucene,
targetAddress,
additionalArgs
);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
Expand All @@ -193,52 +197,4 @@ private static int runProcessAgainstToxicTarget(

return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress,
FailHow failHow
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
tempDirSnapshot.toString(),
"--lucene-dir",
tempDirLucene.toString(),
"--target-host",
targetAddress,
"--index-allowlist",
"geonames",
"--documents-per-bulk-request",
"10",
"--max-connections",
"1",
"--source-version",
"ES_7_10",
"--initial-lease-duration",
failHow == FailHow.NEVER ? "PT10M" : "PT1S" };

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}
}
Loading

0 comments on commit 10acac7

Please sign in to comment.