Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify Jenkins Full E2E Integ Test to perform Transformations #1182

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ dependencies {
testImplementation group: 'org.testcontainers', name: 'toxiproxy'
testImplementation group: 'org.mockito', name: 'mockito-core'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.json', name: 'json'

testImplementation platform('io.projectreactor:reactor-bom:2023.0.5')
testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
package org.opensearch.migrations.bulkload;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.opensearch.migrations.CreateSnapshot;
import org.opensearch.migrations.bulkload.common.RestClient;
import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams;
import org.opensearch.migrations.bulkload.framework.SearchClusterContainer;
import org.opensearch.migrations.bulkload.http.ClusterOperations;
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext;
import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Network;

@Slf4j
@Tag("isolatedTest")
public class CustomTransformationTest extends SourceTestBase {

public static final String TARGET_DOCKER_HOSTNAME = "target";
public static final String SNAPSHOT_NAME = "test_snapshot";

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
SearchClusterContainer targetContainer;
}

@Test
public void testCustomTransformationProducesDesiredTargetClusterState() {
String nameTransformation = createIndexNameTransformation("geonames", "geonames_transformed");
var expectedSourceMap = new HashMap<String, Integer>();
expectedSourceMap.put("geonames", 1);
var expectedTargetMap = new HashMap<String, Integer>();
expectedTargetMap.put("geonames_transformed", 1);
// 2 Shards, for each shard, expect three status code 2 and one status code 0
int shards = 2;
int migrationProcessesPerShard = 4;
int continueExitCode = 2;
int finalExitCodePerShard = 0;
runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards,
finalExitCodePerShard, shards, expectedSourceMap, expectedTargetMap,
d -> runProcessAgainstTarget(d.tempDirSnapshot, d.tempDirLucene, d.targetContainer, nameTransformation
));
}

@SneakyThrows
private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCodes,
int eventualExitCode, int eventualExitCodeCount,
Map<String, Integer> expectedSourceDocs,
Map<String, Integer> expectedTargetDocs,
Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot");
var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");

try (
var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2)
.withAccessToHost(true);
var network = Network.newNetwork();
var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0)
.withAccessToHost(true)
.withNetwork(network)
.withNetworkAliases(TARGET_DOCKER_HOSTNAME);
) {
CompletableFuture.allOf(
CompletableFuture.runAsync(esSourceContainer::start),
CompletableFuture.runAsync(osTargetContainer::start)
).join();

var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl());

var shards = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: why define this shard count again? Maybe you can use a class-level member instead of redefining.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be removed now, I've done a good amount of refactoring to remove extra logic for this test class that isn't needed

// Number of default shards is different across different versions on ES/OS.
// So we explicitly set it.
String body = String.format(
"{" +
" \"settings\": {" +
" \"index\": {" +
" \"number_of_shards\": %d," +
" \"number_of_replicas\": 0" +
" }" +
" }" +
"}",
shards
);
sourceClusterOperations.createIndex("geonames", body);
sourceClusterOperations.createDocument("geonames", "111", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}");

// Create the snapshot from the source cluster
var args = new CreateSnapshot.Args();
args.snapshotName = SNAPSHOT_NAME;
args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR;
args.sourceArgs.host = esSourceContainer.getUrl();

var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext());
snapshotCreator.run();

esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int exitCode;
int finalExitCodeCount = 0;
int runs = 0;
do {
exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, osTargetContainer));
runs++;
if (exitCode == eventualExitCode) {
finalExitCodeCount++;
}
log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log();
// Clean tree for subsequent run
deleteTree(tempDirLucene);
} while (finalExitCodeCount < eventualExitCodeCount && runs < initialExitCodes * 2);

// Assert doc count on the source and target cluster match expected
validateFinalClusterDocs(
esSourceContainer,
osTargetContainer,
DocumentMigrationTestContext.factory().noOtelTracking(),
expectedSourceDocs,
expectedTargetDocs
);
} finally {
deleteTree(tempDirSnapshot);
}
}

private static String createIndexNameTransformation(String existingIndexName, String newIndexName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment explaining what this transform is supposed to accomplish? The method name is a good start, but, frankly, Jolt is inscrutable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

JSONArray rootArray = new JSONArray();
JSONObject firstObject = new JSONObject();
JSONArray jsonConditionalTransformerProvider = new JSONArray();

// JsonJMESPathPredicateProvider object
JSONObject jsonJMESPathPredicateProvider = new JSONObject();
jsonJMESPathPredicateProvider.put("script", String.format("index._index == '%s'", existingIndexName));
JSONObject jsonJMESPathPredicateWrapper = new JSONObject();
jsonJMESPathPredicateWrapper.put("JsonJMESPathPredicateProvider", jsonJMESPathPredicateProvider);
jsonConditionalTransformerProvider.put(jsonJMESPathPredicateWrapper);

JSONArray transformerList = new JSONArray();

// First JsonJoltTransformerProvider
JSONObject firstJoltTransformer = new JSONObject();
JSONObject firstJoltScript = new JSONObject();
firstJoltScript.put("operation", "modify-overwrite-beta");
firstJoltScript.put("spec", new JSONObject().put("index", new JSONObject().put("\\_index", newIndexName)));
firstJoltTransformer.put("JsonJoltTransformerProvider", new JSONObject().put("script", firstJoltScript));
transformerList.put(firstJoltTransformer);

jsonConditionalTransformerProvider.put(transformerList);
firstObject.put("JsonConditionalTransformerProvider", jsonConditionalTransformerProvider);
rootArray.put(firstObject);
return rootArray.toString();
}

@SneakyThrows
private static int runProcessAgainstTarget(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was basically copied from the ProcessLifecycleTest. Can you please find a way to share/re-use rather than copy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done a good amount of refactoring so this should be different, with some shared code move into parent class

Path tempDirSnapshot,
Path tempDirLucene,
SearchClusterContainer targetContainer,
String transformations
)
{
String targetAddress = targetContainer.getUrl();

int timeoutSeconds = 30;
ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, transformations);

var process = runAndMonitorProcess(processBuilder);
boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS);
if (!finished) {
log.atError().setMessage("Process timed out, attempting to kill it...").log();
process.destroy(); // Try to be nice about things first...
if (!process.waitFor(10, TimeUnit.SECONDS)) {
log.atError().setMessage("Process still running, attempting to force kill it...").log();
process.destroyForcibly();
}
Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds).");
}

return process.exitValue();
}


@NotNull
private static ProcessBuilder setupProcess(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this was basically copied from the ProcessLifecycleTest. Can you please find a way to share/re-use rather than copy?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Path tempDirSnapshot,
Path tempDirLucene,
String targetAddress,
String transformations
) {
String classpath = System.getProperty("java.class.path");
String javaHome = System.getProperty("java.home");
String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java";

String[] args = {
"--snapshot-name",
SNAPSHOT_NAME,
"--snapshot-local-dir",
tempDirSnapshot.toString(),
"--lucene-dir",
tempDirLucene.toString(),
"--target-host",
targetAddress,
"--documents-per-bulk-request",
"5",
"--max-connections",
"4",
"--source-version",
"ES_7_10",
"--doc-transformer-config",
transformations,
};

// Kick off the doc migration process
log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}")
.addArgument(() -> Arrays.toString(args))
.log();
ProcessBuilder processBuilder = new ProcessBuilder(
javaExecutable,
Fixed Show fixed Hide fixed
"-cp",
classpath,
"org.opensearch.migrations.RfsMigrateDocuments"
);
processBuilder.command().addAll(Arrays.asList(args));
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput();
return processBuilder;
}

private static void validateFinalClusterDocs(
SearchClusterContainer esSourceContainer,
SearchClusterContainer osTargetContainer,
DocumentMigrationTestContext context,
Map<String, Integer> expectedSourceDocs,
Map<String, Integer> expectedTargetDocs
) {
var targetClient = new RestClient(ConnectionContextTestParams.builder()
.host(osTargetContainer.getUrl())
.build()
.toConnectionContext()
);
var sourceClient = new RestClient(ConnectionContextTestParams.builder()
.host(esSourceContainer.getUrl())
.build()
.toConnectionContext()
);

var requests = new SearchClusterRequests(context);
var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient);
var refreshResponse = targetClient.get("_refresh", context.createUnboundRequestContext());
Assertions.assertEquals(200, refreshResponse.statusCode);
var targetMap = requests.getMapOfIndexAndDocCount(targetClient);

MatcherAssert.assertThat(sourceMap, Matchers.equalTo(expectedSourceDocs));
MatcherAssert.assertThat(targetMap, Matchers.equalTo(expectedTargetDocs));
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
### E2E Integration Testing
## E2E Integration Testing
Developers can run a test script which will verify the end-to-end Docker Solution.

#### Compatibility
### Compatibility
* Python >= 3.7

#### Pre-requisites
### Pre-requisites

* Have all containers from Docker solution running.

Expand All @@ -16,6 +16,26 @@ pip install -r requirements.txt
pytest tests.py
```

### Running in Docker setup

From the root of this repository bring up the Docker environment
```shell
./gradlew -p TrafficCapture dockerSolution:ComposeUp -x test -x spotlessCheck --info --stacktrace
```

The Docker compose file being used can be found [here](../../../docker-compose.yml)
* The integ_test `lib` directory can be directly mounted as a volume on the migration console container to spe

To run one of the integration test suites a command like below can be used:
```shell
docker exec $(docker ps --filter "name=migration-console" -q) pipenv run pytest /root/lib/integ_test/integ_test/full_tests.py --unique_id="testindex" -s
```

To teardown, execute the following command at the root of this repository
```shell
./gradlew -p TrafficCapture dockerSolution:ComposeDown
```

#### Notes

##### Ports Setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ def execute_api_call(cluster: Cluster, path: str, method=HttpMethod.GET, data=No


def create_index(index_name: str, cluster: Cluster, **kwargs):
headers = {'Content-Type': 'application/json'}
return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}",
**kwargs)
headers=headers, **kwargs)


def get_index(index_name: str, cluster: Cluster, **kwargs):
Expand Down Expand Up @@ -221,3 +222,31 @@ def wait_for_running_replayer(replayer: Replayer,
test_case.fail(error_message)
else:
raise ReplayerNotActiveError(error_message)


def convert_transformations_to_str(transform_list: List[Dict]) -> str:
return json.dumps(transform_list)


def get_index_name_transformation(existing_index_name: str, target_index_name: str) -> Dict:
return {
"JsonConditionalTransformerProvider": [
{
"JsonJMESPathPredicateProvider": {
"script": f"name == '{existing_index_name}'"
}
},
[
{
"JsonJoltTransformerProvider": {
"script": {
"operation": "modify-overwrite-beta",
"spec": {
"name": f"{target_index_name}"
}
}
}
}
]
]
}
Loading
Loading