From 569a29249af6db88d6587a3ca0a7d227ebbf9cbc Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Wed, 4 Dec 2024 20:29:18 -0600 Subject: [PATCH 01/16] Add basic name change transform to verify Signed-off-by: Tanner Lewis --- .../migrationConsole/lib/integ_test/README.md | 26 ++++++++++++++-- .../integ_test/common_operations.py | 30 +++++++++++++++++-- .../lib/integ_test/integ_test/full_tests.py | 26 +++++++++++++--- vars/fullES68SourceE2ETest.groovy | 6 ++-- 4 files changed, 76 insertions(+), 12 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md index e6b2c8b37..8bd0ed07f 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/README.md @@ -1,10 +1,10 @@ -### E2E Integration Testing +## E2E Integration Testing Developers can run a test script which will verify the end-to-end Docker Solution. -#### Compatibility +### Compatibility * Python >= 3.7 -#### Pre-requisites +### Pre-requisites * Have all containers from Docker solution running. @@ -16,6 +16,26 @@ pip install -r requirements.txt pytest tests.py ``` +### Running in Docker setup + +From the root of this repository bring up the Docker environment +```shell +./gradlew -p TrafficCapture dockerSolution:ComposeUp -x test -x spotlessCheck --info --stacktrace +``` + +The Docker compose file being used can be found [here](../../../docker-compose.yml) +* The integ_test `lib` directory can be directly mounted as a volume on the migration console container to spe + +To run one of the integration test suites a command like below can be used: +```shell +docker exec $(docker ps --filter "name=migration-console" -q) pipenv run pytest /root/lib/integ_test/integ_test/full_tests.py --unique_id="testindex" -s +``` + +To teardown, execute the following command at the root of this repository +```shell +./gradlew -p TrafficCapture dockerSolution:ComposeDown +``` + #### Notes ##### Ports Setup diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py index abf96c898..63eea5bcd 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py @@ -78,9 +78,9 @@ def execute_api_call(cluster: Cluster, path: str, method=HttpMethod.GET, data=No def create_index(index_name: str, cluster: Cluster, **kwargs): + headers = {'Content-Type': 'application/json'} return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}", - **kwargs) - + headers=headers, **kwargs) def get_index(index_name: str, cluster: Cluster, **kwargs): return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}", @@ -221,3 +221,29 @@ def wait_for_running_replayer(replayer: Replayer, test_case.fail(error_message) else: raise ReplayerNotActiveError(error_message) + +def convert_transformations_to_str(transform_list: List[Dict]) -> str: + return json.dumps(transform_list) + +def get_index_name_transformation(existing_index_name: str, target_index_name: str) -> Dict: + return { + "JsonConditionalTransformerProvider": [ + { + "JsonJMESPathPredicateProvider": { + "script": f"name == '{existing_index_name}'" + } + }, + [ + { + "JsonJoltTransformerProvider": { + "script": { + "operation": "modify-overwrite-beta", + "spec": { + "name": f"{target_index_name}" + } + } + } + } + ] + ] + } \ No newline at end of file diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py index fa591c297..09c66f5b2 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py @@ -1,8 +1,10 @@ +import json import logging import os import pytest import unittest from http import HTTPStatus + from console_link.middleware.clusters import connection_check, clear_indices, ConnectionResult from console_link.models.cluster import Cluster from console_link.models.backfill_base import Backfill @@ -13,7 +15,8 @@ from console_link.middleware.kafka import delete_topic from console_link.models.metadata import Metadata from console_link.cli import Context -from common_operations import (create_index, create_document, check_doc_counts_match, wait_for_running_replayer) +from common_operations import (create_index, create_document, check_doc_counts_match, wait_for_running_replayer, + get_index_name_transformation, convert_transformations_to_str) logger = logging.getLogger(__name__) @@ -88,21 +91,36 @@ def test_e2e_0001_default(self): # Load initial data index_name = f"test_e2e_0001_{pytest.unique_id}" + transformed_index = f"{index_name}_transformed" doc_id_base = "e2e_0001_doc" - create_index(cluster=source_cluster, index_name=index_name, test_case=self) + index_body = { + 'settings': { + 'index': { + 'number_of_shards': 3, + 'number_of_replicas': 0 + } + } + } + create_index(cluster=source_cluster, index_name=index_name, data=json.dumps(index_body), test_case=self) create_document(cluster=source_cluster, index_name=index_name, doc_id=doc_id_base + "_1", expected_status_code=HTTPStatus.CREATED, test_case=self) - # Perform metadata and backfill migrations backfill.create() + # Delete existing snapshot if present and create a new snapshot snapshot: Snapshot = pytest.console_env.snapshot status_result: CommandResult = snapshot.status() if status_result.success: snapshot.delete() snapshot_result: CommandResult = snapshot.create(wait=True) assert snapshot_result.success - metadata_result: CommandResult = metadata.migrate() + + # Perform metadata migration with a transform to index name + index_name_transform = get_index_name_transformation(existing_index_name=index_name, + target_index_name=transformed_index) + transform_arg = convert_transformations_to_str(transform_list=[index_name_transform]) + metadata_result: CommandResult = metadata.migrate(extra_args=["--transformer-config", transform_arg]) assert metadata_result.success + backfill_start_result: CommandResult = backfill.start() assert backfill_start_result.success # small enough to allow containers to be reused, big enough to test scaling out diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index d04651379..fc538fc6c 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -7,8 +7,8 @@ def call(Map config = [:]) { "source-single-node-ec2": { "suffix": "ec2-source-", "networkStackSuffix": "ec2-source-", - "distVersion": "6.8.23", - "distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-6.8.23.tar.gz", + "distVersion": "5.6.16", + "distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.16.tar.gz", "captureProxyEnabled": false, "securityDisabled": true, "minDistribution": false, @@ -43,7 +43,7 @@ def call(Map config = [:]) { "sourceCluster": { "endpoint": "", "auth": {"type": "none"}, - "version": "ES_6.8.23" + "version": "ES_5.6.16" }, "tlsSecurityPolicy": "TLS_1_2", "enforceHTTPS": true, From 413136f95c4efb8a0e4515a773669c2b763ba4ee Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 09:33:42 -0600 Subject: [PATCH 02/16] Revert ES5 change and attempt RFS transform Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 48 +++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index fc538fc6c..30d213cbe 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -1,14 +1,55 @@ +import groovy.json.JsonOutput def call(Map config = [:]) { def sourceContextId = 'source-single-node-ec2' def migrationContextId = 'full-migration' + def time = new Date().getTime() + def uniqueId = "integ_min_${time}_${currentBuild.number}" + def jsonTransformers = [ + [ + JsonConditionalTransformerProvider: [ + [ + JsonJMESPathPredicateProvider: [ + script: "name == 'test_e2e_0001_$uniqueId'" + ] + ], + [ + [ + JsonJoltTransformerProvider: [ + script: [ + operation: "modify-overwrite-beta", + spec: [ + name: "transformed_index" + ] + ] + ] + ], + [ + JsonJoltTransformerProvider: [ + script: [ + operation: "modify-overwrite-beta", + spec: [ + settings: [ + index: [ + number_of_replicas: 3 + ] + ] + ] + ] + ] + ] + ] + ] + ] + ] + def transformersArg = JsonOutput.toJson(jsonTransformers) def source_cdk_context = """ { "source-single-node-ec2": { "suffix": "ec2-source-", "networkStackSuffix": "ec2-source-", - "distVersion": "5.6.16", - "distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.6.16.tar.gz", + "distVersion": "6.8.23", + "distributionUrl": "https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-oss-6.8.23.tar.gz", "captureProxyEnabled": false, "securityDisabled": true, "minDistribution": false, @@ -40,10 +81,11 @@ def call(Map config = [:]) { "trafficReplayerServiceEnabled": true, "trafficReplayerExtraArgs": "--speedup-factor 10.0", "reindexFromSnapshotServiceEnabled": true, + "reindexFromSnapshotExtraArgs": "--transformer-config $transformersArg" "sourceCluster": { "endpoint": "", "auth": {"type": "none"}, - "version": "ES_5.6.16" + "version": "ES_6.8.23" }, "tlsSecurityPolicy": "TLS_1_2", "enforceHTTPS": true, From e171f0f1f9d3314235a0539e92e2c1cfaf83535d Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 09:53:46 -0600 Subject: [PATCH 03/16] Update to base64 Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 30d213cbe..a884c7195 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -42,7 +42,8 @@ def call(Map config = [:]) { ] ] ] - def transformersArg = JsonOutput.toJson(jsonTransformers) + def jsonString = JsonOutput.toJson(jsonTransformers) + def transformersArg = jsonString.bytes.encodeBase64().toString() def source_cdk_context = """ { "source-single-node-ec2": { @@ -81,7 +82,7 @@ def call(Map config = [:]) { "trafficReplayerServiceEnabled": true, "trafficReplayerExtraArgs": "--speedup-factor 10.0", "reindexFromSnapshotServiceEnabled": true, - "reindexFromSnapshotExtraArgs": "--transformer-config $transformersArg" + "reindexFromSnapshotExtraArgs": "--transformer-config-base64 $transformersArg" "sourceCluster": { "endpoint": "", "auth": {"type": "none"}, From 90cee67f75a2a7617e939259d3bcac875a5d367e Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 09:59:22 -0600 Subject: [PATCH 04/16] Update to add comma Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index a884c7195..761d3d0d8 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -82,7 +82,7 @@ def call(Map config = [:]) { "trafficReplayerServiceEnabled": true, "trafficReplayerExtraArgs": "--speedup-factor 10.0", "reindexFromSnapshotServiceEnabled": true, - "reindexFromSnapshotExtraArgs": "--transformer-config-base64 $transformersArg" + "reindexFromSnapshotExtraArgs": "--transformer-config-base64 $transformersArg", "sourceCluster": { "endpoint": "", "auth": {"type": "none"}, From d0d91a3715e133cb1f048a58d3c8e06e7d963684 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 10:31:52 -0600 Subject: [PATCH 05/16] Update pass unique id and use proper arg Signed-off-by: Tanner Lewis --- vars/defaultIntegPipeline.groovy | 10 +++++----- vars/fullES68SourceE2ETest.groovy | 7 ++++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/vars/defaultIntegPipeline.groovy b/vars/defaultIntegPipeline.groovy index 95dc4fd48..30ccafe5d 100644 --- a/vars/defaultIntegPipeline.groovy +++ b/vars/defaultIntegPipeline.groovy @@ -20,6 +20,8 @@ def call(Map config = [:]) { def source_context_file_name = 'sourceJenkinsContext.json' def migration_context_file_name = 'migrationJenkinsContext.json' def skipCaptureProxyOnNodeSetup = config.skipCaptureProxyOnNodeSetup ?: false + def time = new Date().getTime() + def testUniqueId = config.testUniqueId ?: "integ_full_${time}_${currentBuild.number}" def testDir = "/root/lib/integ_test/integ_test" def integTestCommand = config.integTestCommand ?: "${testDir}/replayer_tests.py" pipeline { @@ -153,13 +155,11 @@ def call(Map config = [:]) { if (config.integTestStep) { config.integTestStep() } else { - def time = new Date().getTime() - def uniqueId = "integ_min_${time}_${currentBuild.number}" - def test_result_file = "${testDir}/reports/${uniqueId}/report.xml" + def test_result_file = "${testDir}/reports/${testUniqueId}/report.xml" def populatedIntegTestCommand = integTestCommand.replaceAll("", stage) - def command = "pipenv run pytest --log-file=${testDir}/reports/${uniqueId}/pytest.log " + + def command = "pipenv run pytest --log-file=${testDir}/reports/${testUniqueId}/pytest.log " + "--junitxml=${test_result_file} ${populatedIntegTestCommand} " + - "--unique_id ${uniqueId} " + + "--unique_id ${testUniqueId} " + "--stage ${stage} " + "-s" withCredentials([string(credentialsId: 'migrations-test-account-id', variable: 'MIGRATIONS_TEST_ACCOUNT_ID')]) { diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 761d3d0d8..82792d577 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -4,13 +4,13 @@ def call(Map config = [:]) { def sourceContextId = 'source-single-node-ec2' def migrationContextId = 'full-migration' def time = new Date().getTime() - def uniqueId = "integ_min_${time}_${currentBuild.number}" + def testUniqueId = "integ_full_${time}_${currentBuild.number}" def jsonTransformers = [ [ JsonConditionalTransformerProvider: [ [ JsonJMESPathPredicateProvider: [ - script: "name == 'test_e2e_0001_$uniqueId'" + script: "name == 'test_e2e_0001_$testUniqueId'" ] ], [ @@ -82,7 +82,7 @@ def call(Map config = [:]) { "trafficReplayerServiceEnabled": true, "trafficReplayerExtraArgs": "--speedup-factor 10.0", "reindexFromSnapshotServiceEnabled": true, - "reindexFromSnapshotExtraArgs": "--transformer-config-base64 $transformersArg", + "reindexFromSnapshotExtraArgs": "--doc-transformer-config-base64 $transformersArg", "sourceCluster": { "endpoint": "", "auth": {"type": "none"}, @@ -112,6 +112,7 @@ def call(Map config = [:]) { defaultStageId: 'full-es68', skipCaptureProxyOnNodeSetup: true, jobName: 'full-es68source-e2e-test', + testUniqueId: testUniqueId, integTestCommand: '/root/lib/integ_test/integ_test/full_tests.py --source_proxy_alb_endpoint https://alb.migration..local:9201 --target_proxy_alb_endpoint https://alb.migration..local:9202' ) } From 79fe4bdeb8a968ed1b2be544d390e6922a4a31ea Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 11:13:06 -0600 Subject: [PATCH 06/16] Modify RFS transformation Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 82792d577..fc3a36103 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -19,21 +19,7 @@ def call(Map config = [:]) { script: [ operation: "modify-overwrite-beta", spec: [ - name: "transformed_index" - ] - ] - ] - ], - [ - JsonJoltTransformerProvider: [ - script: [ - operation: "modify-overwrite-beta", - spec: [ - settings: [ - index: [ - number_of_replicas: 3 - ] - ] + name: "test_e2e_0001_${testUniqueId}_transformed" ] ] ] From 79b7af06316b7c1ddf673671065f11d9de1abd1a Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 11:45:52 -0600 Subject: [PATCH 07/16] Update transform Signed-off-by: Tanner Lewis --- .../lib/integ_test/integ_test/full_tests.py | 6 +++--- vars/fullES68SourceE2ETest.groovy | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py index 09c66f5b2..e38fb4752 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py @@ -133,11 +133,11 @@ def test_e2e_0001_default(self): ignore_list = [".", "searchguard", "sg7", "security-auditlog", "reindexed-logs"] expected_docs = {} # Source should have both documents - expected_docs[index_name] = {"count": 2} + expected_docs[transformed_index] = {"count": 2} check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_docs, index_prefix_ignore_list=ignore_list, test_case=self) # Target should have one document from snapshot - expected_docs[index_name] = {"count": 1} + expected_docs[transformed_index] = {"count": 1} check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_docs, index_prefix_ignore_list=ignore_list, max_attempts=20, delay=30.0, test_case=self) @@ -149,7 +149,7 @@ def test_e2e_0001_default(self): replayer.start() wait_for_running_replayer(replayer=replayer) - expected_docs[index_name] = {"count": 3} + expected_docs[transformed_index] = {"count": 3} check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_docs, index_prefix_ignore_list=ignore_list, test_case=self) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index fc3a36103..9a5f0cbe8 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -10,7 +10,7 @@ def call(Map config = [:]) { JsonConditionalTransformerProvider: [ [ JsonJMESPathPredicateProvider: [ - script: "name == 'test_e2e_0001_$testUniqueId'" + script: "index._index == 'test_e2e_0001_$testUniqueId'" ] ], [ @@ -19,7 +19,7 @@ def call(Map config = [:]) { script: [ operation: "modify-overwrite-beta", spec: [ - name: "test_e2e_0001_${testUniqueId}_transformed" + 'index.\\_index': "test_e2e_0001_${testUniqueId}_transformed" ] ] ] From 01a7d7c262c30a89029bc2e981756c37c621b605 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 15:49:33 -0600 Subject: [PATCH 08/16] Modify transformations for RFS and Replayer Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 9a5f0cbe8..29f95e6dc 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -5,7 +5,7 @@ def call(Map config = [:]) { def migrationContextId = 'full-migration' def time = new Date().getTime() def testUniqueId = "integ_full_${time}_${currentBuild.number}" - def jsonTransformers = [ + def rfsJsonTransformations = [ [ JsonConditionalTransformerProvider: [ [ @@ -19,7 +19,9 @@ def call(Map config = [:]) { script: [ operation: "modify-overwrite-beta", spec: [ - 'index.\\_index': "test_e2e_0001_${testUniqueId}_transformed" + index: [ + '\\_index': "test_e2e_0001_${testUniqueId}_transformed" + ] ] ] ] @@ -28,8 +30,19 @@ def call(Map config = [:]) { ] ] ] - def jsonString = JsonOutput.toJson(jsonTransformers) - def transformersArg = jsonString.bytes.encodeBase64().toString() + def rfsJsonString = JsonOutput.toJson(rfsJsonTransformations) + def rfsTransformersArg = rfsJsonString.bytes.encodeBase64().toString() + def replayerJsonTransformations = [ + [ + "JsonJMESPathTransformerProvider": [ + "script": [ + "URI": replace(URI, "test_e2e_0001_$testUniqueId", "test_e2e_0001_${testUniqueId}_transformed") + ] + ] + ] + ] + def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) + def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() def source_cdk_context = """ { "source-single-node-ec2": { @@ -66,9 +79,9 @@ def call(Map config = [:]) { "captureProxyServiceEnabled": true, "targetClusterProxyServiceEnabled": true, "trafficReplayerServiceEnabled": true, - "trafficReplayerExtraArgs": "--speedup-factor 10.0", + "trafficReplayerExtraArgs": "--speedup-factor 10.0 --transformer-config-encoded $replayerTransformersArg", "reindexFromSnapshotServiceEnabled": true, - "reindexFromSnapshotExtraArgs": "--doc-transformer-config-base64 $transformersArg", + "reindexFromSnapshotExtraArgs": "--doc-transformer-config-base64 $rfsTransformersArg", "sourceCluster": { "endpoint": "", "auth": {"type": "none"}, From b2cc0e115b126e7897f28100bd4755e1a3e13918 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 15:54:32 -0600 Subject: [PATCH 09/16] Remove Replayer transformation for now Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 29f95e6dc..c1101ef59 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -32,17 +32,17 @@ def call(Map config = [:]) { ] def rfsJsonString = JsonOutput.toJson(rfsJsonTransformations) def rfsTransformersArg = rfsJsonString.bytes.encodeBase64().toString() - def replayerJsonTransformations = [ - [ - "JsonJMESPathTransformerProvider": [ - "script": [ - "URI": replace(URI, "test_e2e_0001_$testUniqueId", "test_e2e_0001_${testUniqueId}_transformed") - ] - ] - ] - ] - def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) - def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() +// def replayerJsonTransformations = [ +// [ +// "JsonJMESPathTransformerProvider": [ +// "script": [ +// "URI": replace(URI, "test_e2e_0001_$testUniqueId", "test_e2e_0001_${testUniqueId}_transformed") +// ] +// ] +// ] +// ] +// def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) +// def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() def source_cdk_context = """ { "source-single-node-ec2": { @@ -79,7 +79,7 @@ def call(Map config = [:]) { "captureProxyServiceEnabled": true, "targetClusterProxyServiceEnabled": true, "trafficReplayerServiceEnabled": true, - "trafficReplayerExtraArgs": "--speedup-factor 10.0 --transformer-config-encoded $replayerTransformersArg", + "trafficReplayerExtraArgs": "--speedup-factor 10.0", "reindexFromSnapshotServiceEnabled": true, "reindexFromSnapshotExtraArgs": "--doc-transformer-config-base64 $rfsTransformersArg", "sourceCluster": { From fae7986f28d6e1ae76142eeee74df7e933cb51fb Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 16:39:10 -0600 Subject: [PATCH 10/16] Try replayer transformation Signed-off-by: Tanner Lewis --- vars/fullES68SourceE2ETest.groovy | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index c1101ef59..8f32f0da9 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -32,17 +32,21 @@ def call(Map config = [:]) { ] def rfsJsonString = JsonOutput.toJson(rfsJsonTransformations) def rfsTransformersArg = rfsJsonString.bytes.encodeBase64().toString() -// def replayerJsonTransformations = [ -// [ -// "JsonJMESPathTransformerProvider": [ -// "script": [ -// "URI": replace(URI, "test_e2e_0001_$testUniqueId", "test_e2e_0001_${testUniqueId}_transformed") -// ] -// ] -// ] -// ] -// def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) -// def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() + def replayerJsonTransformations = [ + [ + "JsonJMESPathTransformerProvider": [ + "script": [ + "transformerMessageVersion": "transformerMessageVersion", + "URI": "replace(URI, 'test_e2e_0001_$testUniqueId', 'test_e2e_0001_${testUniqueId}_transformed')", + "method": "method", + "headers": "headers", + "payload": "payload" + ] + ] + ] + ] + def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) + def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() def source_cdk_context = """ { "source-single-node-ec2": { @@ -79,7 +83,7 @@ def call(Map config = [:]) { "captureProxyServiceEnabled": true, "targetClusterProxyServiceEnabled": true, "trafficReplayerServiceEnabled": true, - "trafficReplayerExtraArgs": "--speedup-factor 10.0", + "trafficReplayerExtraArgs": "--speedup-factor 10.0 --transformer-config-encoded $replayerTransformersArg", "reindexFromSnapshotServiceEnabled": true, "reindexFromSnapshotExtraArgs": "--doc-transformer-config-base64 $rfsTransformersArg", "sourceCluster": { From a6f51ef0a2a892c539235757a39a3f0ec3439f84 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 17:02:57 -0600 Subject: [PATCH 11/16] Update checks Signed-off-by: Tanner Lewis --- .../lib/integ_test/integ_test/full_tests.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py index e38fb4752..94d147a96 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py @@ -91,7 +91,7 @@ def test_e2e_0001_default(self): # Load initial data index_name = f"test_e2e_0001_{pytest.unique_id}" - transformed_index = f"{index_name}_transformed" + transformed_index_name = f"{index_name}_transformed" doc_id_base = "e2e_0001_doc" index_body = { 'settings': { @@ -116,7 +116,7 @@ def test_e2e_0001_default(self): # Perform metadata migration with a transform to index name index_name_transform = get_index_name_transformation(existing_index_name=index_name, - target_index_name=transformed_index) + target_index_name=transformed_index_name) transform_arg = convert_transformations_to_str(transform_list=[index_name_transform]) metadata_result: CommandResult = metadata.migrate(extra_args=["--transformer-config", transform_arg]) assert metadata_result.success @@ -131,14 +131,15 @@ def test_e2e_0001_default(self): expected_status_code=HTTPStatus.CREATED, test_case=self) ignore_list = [".", "searchguard", "sg7", "security-auditlog", "reindexed-logs"] - expected_docs = {} + expected_source_docs = {} + expected_target_docs = {} # Source should have both documents - expected_docs[transformed_index] = {"count": 2} - check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_docs, + expected_source_docs[index_name] = {"count": 2} + check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, index_prefix_ignore_list=ignore_list, test_case=self) # Target should have one document from snapshot - expected_docs[transformed_index] = {"count": 1} - check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_docs, + expected_target_docs[transformed_index_name] = {"count": 1} + check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, index_prefix_ignore_list=ignore_list, max_attempts=20, delay=30.0, test_case=self) backfill.stop() @@ -149,9 +150,9 @@ def test_e2e_0001_default(self): replayer.start() wait_for_running_replayer(replayer=replayer) - expected_docs[transformed_index] = {"count": 3} - check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_docs, + expected_source_docs[index_name] = {"count": 3} + expected_target_docs[transformed_index_name] = {"count": 3} + check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, index_prefix_ignore_list=ignore_list, test_case=self) - - check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_docs, + check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, index_prefix_ignore_list=ignore_list, max_attempts=30, delay=10.0, test_case=self) From d3f0d21480654c4b85ab9bb6bbce27f30a195bed Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 5 Dec 2024 18:36:06 -0600 Subject: [PATCH 12/16] Handle linting fixes Signed-off-by: Tanner Lewis --- DocumentsFromSnapshotMigration/build.gradle | 1 + .../bulkload/CustomTransformationTest.java | 286 ++++++++++++++++++ .../integ_test/common_operations.py | 5 +- .../lib/integ_test/integ_test/full_tests.py | 3 +- .../build.gradle | 3 + vars/fullES68SourceE2ETest.groovy | 32 +- 6 files changed, 312 insertions(+), 18 deletions(-) create mode 100644 DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java diff --git a/DocumentsFromSnapshotMigration/build.gradle b/DocumentsFromSnapshotMigration/build.gradle index f74fc5b62..143c1afa6 100644 --- a/DocumentsFromSnapshotMigration/build.gradle +++ b/DocumentsFromSnapshotMigration/build.gradle @@ -60,6 +60,7 @@ dependencies { testImplementation group: 'org.testcontainers', name: 'toxiproxy' testImplementation group: 'org.mockito', name: 'mockito-core' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter' + testImplementation group: 'org.json', name: 'json' testImplementation platform('io.projectreactor:reactor-bom:2023.0.5') testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine' diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java new file mode 100644 index 000000000..09cbed807 --- /dev/null +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java @@ -0,0 +1,286 @@ +package org.opensearch.migrations.bulkload; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import org.opensearch.migrations.CreateSnapshot; +import org.opensearch.migrations.bulkload.common.RestClient; +import org.opensearch.migrations.bulkload.common.http.ConnectionContextTestParams; +import org.opensearch.migrations.bulkload.framework.SearchClusterContainer; +import org.opensearch.migrations.bulkload.http.ClusterOperations; +import org.opensearch.migrations.bulkload.http.SearchClusterRequests; +import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; +import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.jetbrains.annotations.NotNull; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; + +@Slf4j +@Tag("isolatedTest") +public class CustomTransformationTest extends SourceTestBase { + + public static final String TARGET_DOCKER_HOSTNAME = "target"; + public static final String SNAPSHOT_NAME = "test_snapshot"; + + @AllArgsConstructor + @Getter + private static class RunData { + Path tempDirSnapshot; + Path tempDirLucene; + SearchClusterContainer targetContainer; + } + + @Test + public void testProcessExitsAsExpected() { + String nameTransformation = createIndexNameTransformation("geonames", "geonames_transformed"); + var expectedSourceMap = new HashMap(); + expectedSourceMap.put("geonames", 1); + var expectedTargetMap = new HashMap(); + expectedTargetMap.put("geonames_transformed", 1); + // 2 Shards, for each shard, expect three status code 2 and one status code 0 + int shards = 2; + int migrationProcessesPerShard = 4; + int continueExitCode = 2; + int finalExitCodePerShard = 0; + runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards, + finalExitCodePerShard, shards, expectedSourceMap, expectedTargetMap, + d -> runProcessAgainstTarget(d.tempDirSnapshot, d.tempDirLucene, d.targetContainer, nameTransformation + )); + } + + @SneakyThrows + private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCodes, + int eventualExitCode, int eventualExitCodeCount, + Map expectedSourceDocs, + Map expectedTargetDocs, + Function processRunner) { + final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); + + var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); + var tempDirLucene = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene"); + + try ( + var esSourceContainer = new SearchClusterContainer(SearchClusterContainer.ES_V7_10_2) + .withAccessToHost(true); + var network = Network.newNetwork(); + var osTargetContainer = new SearchClusterContainer(SearchClusterContainer.OS_V2_14_0) + .withAccessToHost(true) + .withNetwork(network) + .withNetworkAliases(TARGET_DOCKER_HOSTNAME); + ) { + CompletableFuture.allOf( + CompletableFuture.runAsync(esSourceContainer::start), + CompletableFuture.runAsync(osTargetContainer::start) + ).join(); + + var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl()); + + var shards = 2; + // Number of default shards is different across different versions on ES/OS. + // So we explicitly set it. + String body = String.format( + "{" + + " \"settings\": {" + + " \"index\": {" + + " \"number_of_shards\": %d," + + " \"number_of_replicas\": 0" + + " }" + + " }" + + "}", + shards + ); + sourceClusterOperations.createIndex("geonames", body); + sourceClusterOperations.createDocument("geonames", "111", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}"); + + // Create the snapshot from the source cluster + var args = new CreateSnapshot.Args(); + args.snapshotName = SNAPSHOT_NAME; + args.fileSystemRepoPath = SearchClusterContainer.CLUSTER_SNAPSHOT_DIR; + args.sourceArgs.host = esSourceContainer.getUrl(); + + var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); + snapshotCreator.run(); + + esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); + + int exitCode; + int initialExitCodeCount = 0; + int finalExitCodeCount = 0; + int runs = 0; + do { + exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, osTargetContainer)); + runs++; + if (exitCode == initialExitCode) { + initialExitCodeCount++; + } + if (exitCode == eventualExitCode) { + finalExitCodeCount++; + } + log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log(); + // Clean tree for subsequent run + deleteTree(tempDirLucene); + } while (finalExitCodeCount < eventualExitCodeCount && runs < initialExitCodes * 2); + + // Assert doc count on the source and target cluster match expected + validateFinalClusterDocs( + esSourceContainer, + osTargetContainer, + DocumentMigrationTestContext.factory().noOtelTracking(), + expectedSourceDocs, + expectedTargetDocs + ); + } finally { + deleteTree(tempDirSnapshot); + } + } + + private static String createIndexNameTransformation(String existingIndexName, String newIndexName) { + JSONArray rootArray = new JSONArray(); + JSONObject firstObject = new JSONObject(); + JSONArray jsonConditionalTransformerProvider = new JSONArray(); + + // JsonJMESPathPredicateProvider object + JSONObject jsonJMESPathPredicateProvider = new JSONObject(); + jsonJMESPathPredicateProvider.put("script", String.format("index._index == '%s'", existingIndexName)); + JSONObject jsonJMESPathPredicateWrapper = new JSONObject(); + jsonJMESPathPredicateWrapper.put("JsonJMESPathPredicateProvider", jsonJMESPathPredicateProvider); + jsonConditionalTransformerProvider.put(jsonJMESPathPredicateWrapper); + + JSONArray transformerList = new JSONArray(); + + // First JsonJoltTransformerProvider + JSONObject firstJoltTransformer = new JSONObject(); + JSONObject firstJoltScript = new JSONObject(); + firstJoltScript.put("operation", "modify-overwrite-beta"); + firstJoltScript.put("spec", new JSONObject().put("index", new JSONObject().put("\\_index", newIndexName))); + firstJoltTransformer.put("JsonJoltTransformerProvider", new JSONObject().put("script", firstJoltScript)); + transformerList.put(firstJoltTransformer); + + jsonConditionalTransformerProvider.put(transformerList); + firstObject.put("JsonConditionalTransformerProvider", jsonConditionalTransformerProvider); + rootArray.put(firstObject); + return rootArray.toString(); + } + + @SneakyThrows + private static int runProcessAgainstTarget( + Path tempDirSnapshot, + Path tempDirLucene, + SearchClusterContainer targetContainer, + String transformations + ) + { + String targetAddress = targetContainer.getUrl(); + + int timeoutSeconds = 30; + ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, transformations); + + var process = runAndMonitorProcess(processBuilder); + boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); + if (!finished) { + log.atError().setMessage("Process timed out, attempting to kill it...").log(); + process.destroy(); // Try to be nice about things first... + if (!process.waitFor(10, TimeUnit.SECONDS)) { + log.atError().setMessage("Process still running, attempting to force kill it...").log(); + process.destroyForcibly(); + } + Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds)."); + } + + return process.exitValue(); + } + + + @NotNull + private static ProcessBuilder setupProcess( + Path tempDirSnapshot, + Path tempDirLucene, + String targetAddress, + String transformations + ) { + String classpath = System.getProperty("java.class.path"); + String javaHome = System.getProperty("java.home"); + String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; + + String[] args = { + "--snapshot-name", + SNAPSHOT_NAME, + "--snapshot-local-dir", + tempDirSnapshot.toString(), + "--lucene-dir", + tempDirLucene.toString(), + "--target-host", + targetAddress, + "--documents-per-bulk-request", + "5", + "--max-connections", + "4", + "--source-version", + "ES_7_10", + "--doc-transformer-config", + transformations, + }; + + // Kick off the doc migration process + log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") + .addArgument(() -> Arrays.toString(args)) + .log(); + ProcessBuilder processBuilder = new ProcessBuilder( + javaExecutable, + "-cp", + classpath, + "org.opensearch.migrations.RfsMigrateDocuments" + ); + processBuilder.command().addAll(Arrays.asList(args)); + processBuilder.redirectErrorStream(true); + processBuilder.redirectOutput(); + return processBuilder; + } + + private static void validateFinalClusterDocs( + SearchClusterContainer esSourceContainer, + SearchClusterContainer osTargetContainer, + DocumentMigrationTestContext context, + Map expectedSourceDocs, + Map expectedTargetDocs + ) { + var targetClient = new RestClient(ConnectionContextTestParams.builder() + .host(osTargetContainer.getUrl()) + .build() + .toConnectionContext() + ); + var sourceClient = new RestClient(ConnectionContextTestParams.builder() + .host(esSourceContainer.getUrl()) + .build() + .toConnectionContext() + ); + + var requests = new SearchClusterRequests(context); + var sourceMap = requests.getMapOfIndexAndDocCount(sourceClient); + var refreshResponse = targetClient.get("_refresh", context.createUnboundRequestContext()); + Assertions.assertEquals(200, refreshResponse.statusCode); + var targetMap = requests.getMapOfIndexAndDocCount(targetClient); + + MatcherAssert.assertThat(sourceMap, Matchers.equalTo(expectedSourceDocs)); + MatcherAssert.assertThat(targetMap, Matchers.equalTo(expectedTargetDocs)); + } + +} diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py index 63eea5bcd..84bf57c99 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/common_operations.py @@ -82,6 +82,7 @@ def create_index(index_name: str, cluster: Cluster, **kwargs): return execute_api_call(cluster=cluster, method=HttpMethod.PUT, path=f"/{index_name}", headers=headers, **kwargs) + def get_index(index_name: str, cluster: Cluster, **kwargs): return execute_api_call(cluster=cluster, method=HttpMethod.GET, path=f"/{index_name}", **kwargs) @@ -222,9 +223,11 @@ def wait_for_running_replayer(replayer: Replayer, else: raise ReplayerNotActiveError(error_message) + def convert_transformations_to_str(transform_list: List[Dict]) -> str: return json.dumps(transform_list) + def get_index_name_transformation(existing_index_name: str, target_index_name: str) -> Dict: return { "JsonConditionalTransformerProvider": [ @@ -246,4 +249,4 @@ def get_index_name_transformation(existing_index_name: str, target_index_name: s } ] ] - } \ No newline at end of file + } diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py index 94d147a96..df33d9091 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py @@ -151,7 +151,8 @@ def test_e2e_0001_default(self): wait_for_running_replayer(replayer=replayer) expected_source_docs[index_name] = {"count": 3} - expected_target_docs[transformed_index_name] = {"count": 3} + # TODO Replayer transformation needed to only have docs in the transformed index + expected_target_docs[index_name] = {"count": 2} check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, index_prefix_ignore_list=ignore_list, test_case=self) check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, diff --git a/commonDependencyVersionConstraints/build.gradle b/commonDependencyVersionConstraints/build.gradle index f785f7e23..9f41ed86a 100644 --- a/commonDependencyVersionConstraints/build.gradle +++ b/commonDependencyVersionConstraints/build.gradle @@ -119,6 +119,9 @@ dependencies { api group: 'org.semver4j', name: 'semver4j', version: '5.3.0' + def json = '20240303' + api group: 'org.json', name: 'json', version: json + // ************************************************************ // The following constraints are for mitigating transitive CVEs // ************************************************************ diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 8f32f0da9..8fe66f213 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -32,21 +32,21 @@ def call(Map config = [:]) { ] def rfsJsonString = JsonOutput.toJson(rfsJsonTransformations) def rfsTransformersArg = rfsJsonString.bytes.encodeBase64().toString() - def replayerJsonTransformations = [ - [ - "JsonJMESPathTransformerProvider": [ - "script": [ - "transformerMessageVersion": "transformerMessageVersion", - "URI": "replace(URI, 'test_e2e_0001_$testUniqueId', 'test_e2e_0001_${testUniqueId}_transformed')", - "method": "method", - "headers": "headers", - "payload": "payload" - ] - ] - ] - ] - def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) - def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() +// def replayerJsonTransformations = [ +// [ +// "JsonJMESPathTransformerProvider": [ +// "script": [ +// "transformerMessageVersion": "transformerMessageVersion", +// "URI": "replace(URI, 'test_e2e_0001_$testUniqueId', 'test_e2e_0001_${testUniqueId}_transformed')", +// "method": "method", +// "headers": "headers", +// "payload": "payload" +// ] +// ] +// ] +// ] +// def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) +// def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() def source_cdk_context = """ { "source-single-node-ec2": { @@ -83,7 +83,7 @@ def call(Map config = [:]) { "captureProxyServiceEnabled": true, "targetClusterProxyServiceEnabled": true, "trafficReplayerServiceEnabled": true, - "trafficReplayerExtraArgs": "--speedup-factor 10.0 --transformer-config-encoded $replayerTransformersArg", + "trafficReplayerExtraArgs": "--speedup-factor 10.0", "reindexFromSnapshotServiceEnabled": true, "reindexFromSnapshotExtraArgs": "--doc-transformer-config-base64 $rfsTransformersArg", "sourceCluster": { From 552d7f72bffc3f76c714ab113b9931e5469e26f6 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Fri, 6 Dec 2024 08:48:11 -0600 Subject: [PATCH 13/16] Fix doc count Signed-off-by: Tanner Lewis --- .../migrationConsole/lib/integ_test/integ_test/full_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py index df33d9091..192125f9b 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/full_tests.py @@ -152,7 +152,7 @@ def test_e2e_0001_default(self): expected_source_docs[index_name] = {"count": 3} # TODO Replayer transformation needed to only have docs in the transformed index - expected_target_docs[index_name] = {"count": 2} + expected_target_docs[index_name] = {"count": 3} check_doc_counts_match(cluster=source_cluster, expected_index_details=expected_source_docs, index_prefix_ignore_list=ignore_list, test_case=self) check_doc_counts_match(cluster=target_cluster, expected_index_details=expected_target_docs, From c5fba6cc38c26a526836da515e422ca7066c41c7 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Fri, 6 Dec 2024 11:09:31 -0600 Subject: [PATCH 14/16] Minor naming and cleanup Signed-off-by: Tanner Lewis --- .../bulkload/CustomTransformationTest.java | 6 +----- vars/fullES68SourceE2ETest.groovy | 15 --------------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java index 09cbed807..78a780b15 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java @@ -49,7 +49,7 @@ private static class RunData { } @Test - public void testProcessExitsAsExpected() { + public void testCustomTransformationProducesDesiredTargetClusterState() { String nameTransformation = createIndexNameTransformation("geonames", "geonames_transformed"); var expectedSourceMap = new HashMap(); expectedSourceMap.put("geonames", 1); @@ -122,15 +122,11 @@ private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCo esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); int exitCode; - int initialExitCodeCount = 0; int finalExitCodeCount = 0; int runs = 0; do { exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, osTargetContainer)); runs++; - if (exitCode == initialExitCode) { - initialExitCodeCount++; - } if (exitCode == eventualExitCode) { finalExitCodeCount++; } diff --git a/vars/fullES68SourceE2ETest.groovy b/vars/fullES68SourceE2ETest.groovy index 8fe66f213..ee5fa7734 100644 --- a/vars/fullES68SourceE2ETest.groovy +++ b/vars/fullES68SourceE2ETest.groovy @@ -32,21 +32,6 @@ def call(Map config = [:]) { ] def rfsJsonString = JsonOutput.toJson(rfsJsonTransformations) def rfsTransformersArg = rfsJsonString.bytes.encodeBase64().toString() -// def replayerJsonTransformations = [ -// [ -// "JsonJMESPathTransformerProvider": [ -// "script": [ -// "transformerMessageVersion": "transformerMessageVersion", -// "URI": "replace(URI, 'test_e2e_0001_$testUniqueId', 'test_e2e_0001_${testUniqueId}_transformed')", -// "method": "method", -// "headers": "headers", -// "payload": "payload" -// ] -// ] -// ] -// ] -// def replayerJsonString = JsonOutput.toJson(replayerJsonTransformations) -// def replayerTransformersArg = replayerJsonString.bytes.encodeBase64().toString() def source_cdk_context = """ { "source-single-node-ec2": { From b1892827ccedb512844fbb835efc8d60406e60d4 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 12 Dec 2024 15:48:41 -0600 Subject: [PATCH 15/16] Testing refactoring and addressing PR comments Signed-off-by: Tanner Lewis --- ....java => CustomRfsTransformationTest.java} | 201 ++++++------------ .../bulkload/LeaseExpirationTest.java | 89 +++----- .../migrations/bulkload/SourceTestBase.java | 48 +++++ 3 files changed, 141 insertions(+), 197 deletions(-) rename DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/{CustomTransformationTest.java => CustomRfsTransformationTest.java} (57%) diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomRfsTransformationTest.java similarity index 57% rename from DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java rename to DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomRfsTransformationTest.java index 78a780b15..06c84e071 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomTransformationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/CustomRfsTransformationTest.java @@ -1,14 +1,13 @@ package org.opensearch.migrations.bulkload; -import java.io.File; import java.nio.file.Files; -import java.nio.file.Path; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.stream.Stream; import org.opensearch.migrations.CreateSnapshot; import org.opensearch.migrations.bulkload.common.RestClient; @@ -19,13 +18,10 @@ import org.opensearch.migrations.reindexer.tracing.DocumentMigrationTestContext; import org.opensearch.migrations.snapshot.creation.tracing.SnapshotTestContext; -import lombok.AllArgsConstructor; -import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; -import org.jetbrains.annotations.NotNull; import org.json.JSONArray; import org.json.JSONObject; import org.junit.jupiter.api.Assertions; @@ -33,21 +29,14 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.Network; + @Slf4j @Tag("isolatedTest") -public class CustomTransformationTest extends SourceTestBase { +public class CustomRfsTransformationTest extends SourceTestBase { public static final String TARGET_DOCKER_HOSTNAME = "target"; public static final String SNAPSHOT_NAME = "test_snapshot"; - @AllArgsConstructor - @Getter - private static class RunData { - Path tempDirSnapshot; - Path tempDirLucene; - SearchClusterContainer targetContainer; - } - @Test public void testCustomTransformationProducesDesiredTargetClusterState() { String nameTransformation = createIndexNameTransformation("geonames", "geonames_transformed"); @@ -55,23 +44,47 @@ public void testCustomTransformationProducesDesiredTargetClusterState() { expectedSourceMap.put("geonames", 1); var expectedTargetMap = new HashMap(); expectedTargetMap.put("geonames_transformed", 1); - // 2 Shards, for each shard, expect three status code 2 and one status code 0 - int shards = 2; - int migrationProcessesPerShard = 4; - int continueExitCode = 2; - int finalExitCodePerShard = 0; - runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards, - finalExitCodePerShard, shards, expectedSourceMap, expectedTargetMap, - d -> runProcessAgainstTarget(d.tempDirSnapshot, d.tempDirLucene, d.targetContainer, nameTransformation - )); + String[] transformationArgs = { + "--doc-transformer-config", + nameTransformation, + }; + int totalSourceShards = 1; + Consumer loadDataIntoSource = cluster -> { + // Number of default shards is different across different versions on ES/OS. + // So we explicitly set it. + String body = String.format( + "{" + + " \"settings\": {" + + " \"index\": {" + + " \"number_of_shards\": %d," + + " \"number_of_replicas\": 0" + + " }" + + " }" + + "}", + totalSourceShards + ); + cluster.createIndex("geonames", body); + cluster.createDocument("geonames", "111", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}"); + }; + runTestProcess( + transformationArgs, + expectedSourceMap, + expectedTargetMap, + loadDataIntoSource, + totalSourceShards, + SourceTestBase::runProcessAgainstTarget + ); } @SneakyThrows - private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCodes, - int eventualExitCode, int eventualExitCodeCount, - Map expectedSourceDocs, - Map expectedTargetDocs, - Function processRunner) { + private void runTestProcess( + String[] transformationArgs, + Map expectedSourceDocs, + Map expectedTargetDocs, + Consumer preloadDataOperations, + Integer numberOfShards, + Function processRunner) + { final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); var tempDirSnapshot = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_snapshot"); @@ -92,23 +105,7 @@ private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCo ).join(); var sourceClusterOperations = new ClusterOperations(esSourceContainer.getUrl()); - - var shards = 2; - // Number of default shards is different across different versions on ES/OS. - // So we explicitly set it. - String body = String.format( - "{" + - " \"settings\": {" + - " \"index\": {" + - " \"number_of_shards\": %d," + - " \"number_of_replicas\": 0" + - " }" + - " }" + - "}", - shards - ); - sourceClusterOperations.createIndex("geonames", body); - sourceClusterOperations.createDocument("geonames", "111", "{\"author\":\"Tobias Funke\", \"category\": \"cooking\"}"); + preloadDataOperations.accept(sourceClusterOperations); // Create the snapshot from the source cluster var args = new CreateSnapshot.Args(); @@ -118,22 +115,31 @@ private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCo var snapshotCreator = new CreateSnapshot(args, testSnapshotContext.createSnapshotCreateContext()); snapshotCreator.run(); - esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); - int exitCode; - int finalExitCodeCount = 0; - int runs = 0; - do { - exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, osTargetContainer)); - runs++; - if (exitCode == eventualExitCode) { - finalExitCodeCount++; - } + String[] processArgs = { + "--snapshot-name", + SNAPSHOT_NAME, + "--snapshot-local-dir", + tempDirSnapshot.toString(), + "--lucene-dir", + tempDirLucene.toString(), + "--target-host", + osTargetContainer.getUrl(), + "--documents-per-bulk-request", + "5", + "--max-connections", + "4", + "--source-version", + "ES_7_10" + }; + String[] completeArgs = Stream.concat(Arrays.stream(processArgs), Arrays.stream(transformationArgs)).toArray(String[]::new); + + // Perform RFS process for each shard + for(int i = 0; i < numberOfShards; i++) { + int exitCode = processRunner.apply(completeArgs); log.atInfo().setMessage("Process exited with code: {}").addArgument(exitCode).log(); - // Clean tree for subsequent run - deleteTree(tempDirLucene); - } while (finalExitCodeCount < eventualExitCodeCount && runs < initialExitCodes * 2); + } // Assert doc count on the source and target cluster match expected validateFinalClusterDocs( @@ -148,6 +154,8 @@ private void runTestProcessWithCheckpoint(int initialExitCode, int initialExitCo } } + // Create a simple Jolt transform which matches documents of a given index name in a snpahost and changes that + // index name to a desired index name when migrated to the target cluster private static String createIndexNameTransformation(String existingIndexName, String newIndexName) { JSONArray rootArray = new JSONArray(); JSONObject firstObject = new JSONObject(); @@ -176,81 +184,6 @@ private static String createIndexNameTransformation(String existingIndexName, St return rootArray.toString(); } - @SneakyThrows - private static int runProcessAgainstTarget( - Path tempDirSnapshot, - Path tempDirLucene, - SearchClusterContainer targetContainer, - String transformations - ) - { - String targetAddress = targetContainer.getUrl(); - - int timeoutSeconds = 30; - ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, transformations); - - var process = runAndMonitorProcess(processBuilder); - boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); - if (!finished) { - log.atError().setMessage("Process timed out, attempting to kill it...").log(); - process.destroy(); // Try to be nice about things first... - if (!process.waitFor(10, TimeUnit.SECONDS)) { - log.atError().setMessage("Process still running, attempting to force kill it...").log(); - process.destroyForcibly(); - } - Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds)."); - } - - return process.exitValue(); - } - - - @NotNull - private static ProcessBuilder setupProcess( - Path tempDirSnapshot, - Path tempDirLucene, - String targetAddress, - String transformations - ) { - String classpath = System.getProperty("java.class.path"); - String javaHome = System.getProperty("java.home"); - String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; - - String[] args = { - "--snapshot-name", - SNAPSHOT_NAME, - "--snapshot-local-dir", - tempDirSnapshot.toString(), - "--lucene-dir", - tempDirLucene.toString(), - "--target-host", - targetAddress, - "--documents-per-bulk-request", - "5", - "--max-connections", - "4", - "--source-version", - "ES_7_10", - "--doc-transformer-config", - transformations, - }; - - // Kick off the doc migration process - log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") - .addArgument(() -> Arrays.toString(args)) - .log(); - ProcessBuilder processBuilder = new ProcessBuilder( - javaExecutable, - "-cp", - classpath, - "org.opensearch.migrations.RfsMigrateDocuments" - ); - processBuilder.command().addAll(Arrays.asList(args)); - processBuilder.redirectErrorStream(true); - processBuilder.redirectOutput(); - return processBuilder; - } - private static void validateFinalClusterDocs( SearchClusterContainer esSourceContainer, SearchClusterContainer osTargetContainer, diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java index 4b1d30f97..17ac9f746 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java @@ -1,9 +1,6 @@ package org.opensearch.migrations.bulkload; -import java.io.File; import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -26,7 +23,6 @@ import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -45,8 +41,7 @@ public class LeaseExpirationTest extends SourceTestBase { @AllArgsConstructor @Getter private static class RunData { - Path tempDirSnapshot; - Path tempDirLucene; + String[] processArgs; ToxiProxyWrapper proxyContainer; } @@ -66,8 +61,8 @@ public void testProcessExitsAsExpected() { int finalExitCodePerShard = 0; runTestProcessWithCheckpoint(continueExitCode, (migrationProcessesPerShard - 1) * shards, finalExitCodePerShard, shards, shards, indexDocCount, - d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer - )); + d -> runProcessAgainstToxicTarget(d.processArgs, d.proxyContainer) + ); } @SneakyThrows @@ -139,12 +134,32 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); + String[] processArgs = { + "--snapshot-name", + SNAPSHOT_NAME, + "--snapshot-local-dir", + tempDirSnapshot.toString(), + "--lucene-dir", + tempDirLucene.toString(), + "--target-host", + proxyContainer.getProxyUriAsString(), + "--index-allowlist", + "geonames", + "--documents-per-bulk-request", + "5", + "--max-connections", + "4", + "--source-version", + "ES_7_10", + "--initial-lease-duration", + "PT20s" }; + int exitCode; int initialExitCodeCount = 0; int finalExitCodeCount = 0; int runs = 0; do { - exitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer)); + exitCode = processRunner.apply(new RunData(processArgs, proxyContainer)); runs++; if (exitCode == expectedInitialExitCode) { initialExitCodeCount++; @@ -185,19 +200,14 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec } @SneakyThrows - private static int runProcessAgainstToxicTarget( - Path tempDirSnapshot, - Path tempDirLucene, - ToxiProxyWrapper proxyContainer - ) + private static int runProcessAgainstToxicTarget(String[] processArgs, ToxiProxyWrapper proxyContainer) { - String targetAddress = proxyContainer.getProxyUriAsString(); var tp = proxyContainer.getProxy(); var latency = tp.toxics().latency("latency-toxic", ToxicDirection.UPSTREAM, 125); // Set to less than 2x lease time to ensure leases aren't doubling int timeoutSeconds = 30; - ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress); + ProcessBuilder processBuilder = setupProcess(processArgs); var process = runAndMonitorProcess(processBuilder); boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); @@ -216,51 +226,4 @@ private static int runProcessAgainstToxicTarget( return process.exitValue(); } - - @NotNull - private static ProcessBuilder setupProcess( - Path tempDirSnapshot, - Path tempDirLucene, - String targetAddress - ) { - String classpath = System.getProperty("java.class.path"); - String javaHome = System.getProperty("java.home"); - String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; - - String[] args = { - "--snapshot-name", - SNAPSHOT_NAME, - "--snapshot-local-dir", - tempDirSnapshot.toString(), - "--lucene-dir", - tempDirLucene.toString(), - "--target-host", - targetAddress, - "--index-allowlist", - "geonames", - "--documents-per-bulk-request", - "5", - "--max-connections", - "4", - "--source-version", - "ES_7_10", - "--initial-lease-duration", - "PT20s" }; - - // Kick off the doc migration process - log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") - .addArgument(() -> Arrays.toString(args)) - .log(); - ProcessBuilder processBuilder = new ProcessBuilder( - javaExecutable, - "-cp", - classpath, - "org.opensearch.migrations.RfsMigrateDocuments" - ); - processBuilder.command().addAll(Arrays.asList(args)); - processBuilder.redirectErrorStream(true); - processBuilder.redirectOutput(); - return processBuilder; - } - } diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index c36203c29..431615145 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -2,16 +2,19 @@ import java.io.BufferedReader; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; import java.time.Duration; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -96,6 +99,51 @@ protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) thr return process; } + @SneakyThrows + protected static int runProcessAgainstTarget(String[] processArgs) + { + int timeoutSeconds = 30; + ProcessBuilder processBuilder = setupProcess(processArgs); + + var process = runAndMonitorProcess(processBuilder); + boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); + if (!finished) { + log.atError().setMessage("Process timed out, attempting to kill it...").log(); + process.destroy(); // Try to be nice about things first... + if (!process.waitFor(10, TimeUnit.SECONDS)) { + log.atError().setMessage("Process still running, attempting to force kill it...").log(); + process.destroyForcibly(); + } + Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds)."); + } + + return process.exitValue(); + } + + + @NotNull + protected static ProcessBuilder setupProcess(String[] processArgs) { + String classpath = System.getProperty("java.class.path"); + String javaHome = System.getProperty("java.home"); + String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; + + // Kick off the doc migration process + log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") + .addArgument(() -> Arrays.toString(processArgs)) + .log(); + ProcessBuilder processBuilder = new ProcessBuilder( + javaExecutable, + "-cp", + classpath, + "org.opensearch.migrations.RfsMigrateDocuments" + ); + processBuilder.command().addAll(Arrays.asList(processArgs)); + processBuilder.redirectErrorStream(true); + processBuilder.redirectOutput(); + return processBuilder; + } + + @AllArgsConstructor public static class ExpectedMigrationWorkTerminationException extends RuntimeException { public final RfsMigrateDocuments.NoWorkLeftException exception; From 0e2f4530963a685fbe4f7288244a01022f742996 Mon Sep 17 00:00:00 2001 From: Tanner Lewis Date: Thu, 12 Dec 2024 16:11:04 -0600 Subject: [PATCH 16/16] Handle changes from main and refactor further Signed-off-by: Tanner Lewis --- .../bulkload/LeaseExpirationTest.java | 0 .../bulkload/ProcessLifecycleTest.java | 95 +++---------------- .../migrations/bulkload/SourceTestBase.java | 10 +- 3 files changed, 17 insertions(+), 88 deletions(-) delete mode 100644 DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java deleted file mode 100644 index e69de29bb..000000000 diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index 8d11f7005..38488f0b8 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -1,12 +1,7 @@ package org.opensearch.migrations.bulkload; -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; -import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -27,7 +22,6 @@ import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -124,8 +118,8 @@ private void testProcess(int expectedExitCode, Function proces var proxyContainer = new ToxiProxyWrapper(network) ) { CompletableFuture.allOf( - CompletableFuture.runAsync(() -> esSourceContainer.start()), - CompletableFuture.runAsync(() -> osTargetContainer.start()), + CompletableFuture.runAsync(esSourceContainer::start), + CompletableFuture.runAsync(osTargetContainer::start), CompletableFuture.runAsync(() -> proxyContainer.start(TARGET_DOCKER_HOSTNAME, OPENSEARCH_PORT)) ).join(); @@ -180,36 +174,7 @@ private static int runProcessAgainstToxicTarget( } int timeoutSeconds = 90; - ProcessBuilder processBuilder = setupProcess(tempDirSnapshot, tempDirLucene, targetAddress, failHow); - - var process = runAndMonitorProcess(processBuilder); - boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); - if (!finished) { - log.atError().setMessage("Process timed out, attempting to kill it...").log(); - process.destroy(); // Try to be nice about things first... - if (!process.waitFor(10, TimeUnit.SECONDS)) { - log.atError().setMessage("Process still running, attempting to force kill it...").log(); - process.destroyForcibly(); - } - Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds)."); - } - - return process.exitValue(); - } - - - @NotNull - private static ProcessBuilder setupProcess( - Path tempDirSnapshot, - Path tempDirLucene, - String targetAddress, - FailHow failHow - ) { - String classpath = System.getProperty("java.class.path"); - String javaHome = System.getProperty("java.home"); - String javaExecutable = javaHome + File.separator + "bin" + File.separator + "java"; - - String[] args = { + String[] processArgs = { "--snapshot-name", SNAPSHOT_NAME, "--snapshot-local-dir", @@ -228,51 +193,21 @@ private static ProcessBuilder setupProcess( "ES_7_10", "--initial-lease-duration", failHow == FailHow.NEVER ? "PT10M" : "PT1S" }; + ProcessBuilder processBuilder = setupProcess(processArgs); - // Kick off the doc migration process - log.atInfo().setMessage("Running RfsMigrateDocuments with args: {}") - .addArgument(() -> Arrays.toString(args)) - .log(); - ProcessBuilder processBuilder = new ProcessBuilder( - javaExecutable, - "-cp", - classpath, - "org.opensearch.migrations.RfsMigrateDocuments" - ); - processBuilder.command().addAll(Arrays.asList(args)); - processBuilder.redirectErrorStream(true); - processBuilder.redirectOutput(); - return processBuilder; - } - - @NotNull - private static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException { - var process = processBuilder.start(); - - log.atInfo().setMessage("Process started with ID: {}").addArgument(() -> process.toHandle().pid()).log(); - - BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); - var readerThread = new Thread(() -> { - String line; - while (true) { - try { - if ((line = reader.readLine()) == null) break; - } catch (IOException e) { - log.atWarn().setCause(e).setMessage("Couldn't read next line from sub-process").log(); - return; - } - String finalLine = line; - log.atInfo() - .setMessage("from sub-process [{}]: {}") - .addArgument(() -> process.toHandle().pid()) - .addArgument(finalLine) - .log(); + var process = runAndMonitorProcess(processBuilder); + boolean finished = process.waitFor(timeoutSeconds, TimeUnit.SECONDS); + if (!finished) { + log.atError().setMessage("Process timed out, attempting to kill it...").log(); + process.destroy(); // Try to be nice about things first... + if (!process.waitFor(10, TimeUnit.SECONDS)) { + log.atError().setMessage("Process still running, attempting to force kill it...").log(); + process.destroyForcibly(); } - }); + Assertions.fail("The process did not finish within the timeout period (" + timeoutSeconds + " seconds)."); + } - // Kill the process and fail if we have to wait too long - readerThread.start(); - return process; + return process.exitValue(); } } diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java index cbeca190e..49f9c54a0 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/SourceTestBase.java @@ -4,6 +4,7 @@ import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; @@ -47,23 +48,16 @@ import lombok.extern.slf4j.Slf4j; import org.hamcrest.MatcherAssert; import org.hamcrest.Matchers; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Assertions; import reactor.core.publisher.Flux; @Slf4j public class SourceTestBase { - public static final String GENERATOR_BASE_IMAGE = "migrations/elasticsearch_client_test_console:latest"; public static final int MAX_SHARD_SIZE_BYTES = 64 * 1024 * 1024; public static final String SOURCE_SERVER_ALIAS = "source"; public static final long TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 3600; - protected static Object[] makeParamsForBase(SearchClusterContainer.ContainerVersion baseSourceImage) { - return new Object[]{ - baseSourceImage, - GENERATOR_BASE_IMAGE, - new String[]{"/root/runTestBenchmarks.sh", "--endpoint", "http://" + SOURCE_SERVER_ALIAS + ":9200/"}}; - } - @NotNull protected static Process runAndMonitorProcess(ProcessBuilder processBuilder) throws IOException { var process = processBuilder.start();