From d60125dcc070c1bd84b231a425043679a1e0aa36 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 2 Oct 2024 19:07:19 -0400 Subject: [PATCH 1/5] Run the RFS Container's DocumentMigration application repeatedly as long it's successful. Unsuccessful means anything but a 0 exit code. The application has been cleaned up to return 3 for no work available. It used to return whatever a top-level exception would create. That part wasn't necessary, but it seemed like a good idea to test for that to make sure that after running repeatedly, processes would eventually do THAT instead of returning 0, causing infinite loops in the containers. Signed-off-by: Greg Schohn --- .../docker/entrypoint.sh | 5 +- .../migrations/RfsMigrateDocuments.java | 26 ++++++---- .../bulkload/ProcessLifecycleTest.java | 49 +++++++++++++++++-- .../integ_test/integ_test/backfill_tests.py | 3 +- 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/DocumentsFromSnapshotMigration/docker/entrypoint.sh b/DocumentsFromSnapshotMigration/docker/entrypoint.sh index 7deb44da9..32473136d 100755 --- a/DocumentsFromSnapshotMigration/docker/entrypoint.sh +++ b/DocumentsFromSnapshotMigration/docker/entrypoint.sh @@ -43,5 +43,6 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then fi fi -echo "Executing RFS Command" -eval $RFS_COMMAND +[ -z "$RFS_COMMAND" ] && \ +{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \ +until ! { echo "Running command $RFS_COMMAND"; eval "$RFS_COMMAND"; }; do :; done \ No newline at end of file diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index e50369a53..592c09dd1 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -45,7 +45,8 @@ @Slf4j public class RfsMigrateDocuments { - public static final int PROCESS_TIMED_OUT = 2; + public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2; + public static final int NO_WORK_LEFT_EXIT_CODE = 3; public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5; public static final String LOGGING_MDC_WORKER_ID = "workerId"; @@ -184,15 +185,12 @@ public static void main(String[] args) throws Exception { var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null; var connectionContext = arguments.targetArgs.toConnectionContext(); - try (var processManager = new LeaseExpireTrigger(workItemId -> { - log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); - System.exit(PROCESS_TIMED_OUT); - }, Clock.systemUTC()); - var workCoordinator = new OpenSearchWorkCoordinator( - new CoordinateWorkHttpClient(connectionContext), - TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, - workerId - )) { + try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC()); + var workCoordinator = new OpenSearchWorkCoordinator( + new CoordinateWorkHttpClient(connectionContext), + TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS, + workerId) + ) { MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main OpenSearchClient targetClient = new OpenSearchClient(connectionContext); DocumentReindexer reindexer = new DocumentReindexer(targetClient, @@ -233,12 +231,20 @@ public static void main(String[] args) throws Exception { unpackerFactory, arguments.maxShardSizeBytes, context); + } catch (NoWorkLeftException e) { + log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log(); + System.exit(NO_WORK_LEFT_EXIT_CODE); } catch (Exception e) { log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log(); throw e; } } + private static void exitOnLeaseTimeout(String workItemId) { + log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId); + System.exit(PROCESS_TIMED_OUT_EXIT_CODE); + } + private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) { var compositeContextTracker = new CompositeContextTracker( new ActiveContextTracker(), diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index b577ddd3e..ceecdfd54 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -10,9 +10,14 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -47,6 +52,37 @@ enum FailHow { WITH_DELAYS } + @AllArgsConstructor + @Getter + private static class RunData { + Path tempDirSnapshot; + Path tempDirLucene; + ToxiProxyWrapper proxyContainer; + } + + @Test + @Tag("longTest") + public void testExitsZeroThenThreeForSimpleSetup() throws Exception { + testProcess(3, + d -> { + var firstExitCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + Assertions.assertEquals(0, firstExitCode); + for (int i=0; i<10; ++i) { + var secondExitCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + if (secondExitCode != 0) { + var lastErrorCode = + runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER); + Assertions.assertEquals(secondExitCode, lastErrorCode); + return lastErrorCode; + } + } + Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code"); + return -1; // won't be evaluated + }); + } + @ParameterizedTest @CsvSource(value = { // This test will go through a proxy that doesn't add any defects and the process will use defaults @@ -62,6 +98,12 @@ enum FailHow { "WITH_DELAYS, 2" }) public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception { final var failHow = FailHow.valueOf(failAfterString); + testProcess(expectedExitCode, + d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow)); + } + + @SneakyThrows + private void testProcess(int expectedExitCode, Function processRunner) { final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking(); var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2); @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC esSourceContainer.copySnapshotData(tempDirSnapshot.toString()); - int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow); + int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer)); log.atInfo().setMessage("Process exited with code: " + actualExitCode).log(); // Check if the exit code is as expected @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC } } + @SneakyThrows private static int runProcessAgainstToxicTarget( Path tempDirSnapshot, Path tempDirLucene, ToxiProxyWrapper proxyContainer, - FailHow failHow - ) throws IOException, InterruptedException { + FailHow failHow) + { String targetAddress = proxyContainer.getProxyUriAsString(); var tp = proxyContainer.getProxy(); if (failHow == FailHow.AT_STARTUP) { diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py index a95dc9f8d..70f53b4e1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/lib/integ_test/integ_test/backfill_tests.py @@ -62,7 +62,8 @@ def setup_backfill(request): assert metadata_result.success backfill_start_result: CommandResult = backfill.start() assert backfill_start_result.success - backfill_scale_result: CommandResult = backfill.scale(units=10) + # small enough to allow containers to be reused, big enough to test scaling out + backfill_scale_result: CommandResult = backfill.scale(units=2) assert backfill_scale_result.success From 69f38445eadb251f1270649ac2a8419260d775a7 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 2 Oct 2024 19:13:22 -0400 Subject: [PATCH 2/5] Spotless correction. I would love to induce somebody to fix the spotless specification so that all 3rd party imports are in the same alphabetical block. Signed-off-by: Greg Schohn --- .../migrations/bulkload/ProcessLifecycleTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java index ceecdfd54..481e144dc 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/ProcessLifecycleTest.java @@ -12,9 +12,6 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.SneakyThrows; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -29,6 +26,9 @@ import org.opensearch.testcontainers.OpensearchContainer; import eu.rekawek.toxiproxy.model.ToxicDirection; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.testcontainers.containers.Network; From 63216c5b2b15791d25e65758d1936ec8f25b6fcd Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 4 Oct 2024 17:42:04 -0400 Subject: [PATCH 3/5] DocumentMigration entrypoint - Stop looping if we're eating up more disk space. Signed-off-by: Greg Schohn --- .../docker/entrypoint.sh | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/DocumentsFromSnapshotMigration/docker/entrypoint.sh b/DocumentsFromSnapshotMigration/docker/entrypoint.sh index 32473136d..3181e803f 100755 --- a/DocumentsFromSnapshotMigration/docker/entrypoint.sh +++ b/DocumentsFromSnapshotMigration/docker/entrypoint.sh @@ -43,6 +43,20 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then fi fi +# Monitor how much aggregate free space is left, if we drop below a certain amount, +# don't allow another process to run again +space_drop_fraction=4 +initial_free_space=$(df --output=avail | awk 'NR>1 {sum+=$1} END {print sum}') +still_enough_space_left() { + current_free=$(df --output=avail | awk 'NR>1 {sum+=$1} END {print sum}') + if (((initial_free_space - current_free) >= (initial_free_space / $space_drop_fraction))); then + return 1; + else + return 0; + fi +} + + [ -z "$RFS_COMMAND" ] && \ { echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \ -until ! { echo "Running command $RFS_COMMAND"; eval "$RFS_COMMAND"; }; do :; done \ No newline at end of file +until ! { echo "Running command $RFS_COMMAND"; eval "$RFS_COMMAND" && still_enough_space_left; }; do :; done From 5313d16c54d4a739437fd0f1ff327ce287e8a982 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Mon, 7 Oct 2024 14:31:01 -0500 Subject: [PATCH 4/5] Use sed for command line parsing in rfs entrypoint for cleanup directories Signed-off-by: Andre Kurait --- .../docker/entrypoint.sh | 51 ++++++++++++++----- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/DocumentsFromSnapshotMigration/docker/entrypoint.sh b/DocumentsFromSnapshotMigration/docker/entrypoint.sh index 3181e803f..94ab05cfb 100755 --- a/DocumentsFromSnapshotMigration/docker/entrypoint.sh +++ b/DocumentsFromSnapshotMigration/docker/entrypoint.sh @@ -43,20 +43,47 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then fi fi -# Monitor how much aggregate free space is left, if we drop below a certain amount, -# don't allow another process to run again -space_drop_fraction=4 -initial_free_space=$(df --output=avail | awk 'NR>1 {sum+=$1} END {print sum}') -still_enough_space_left() { - current_free=$(df --output=avail | awk 'NR>1 {sum+=$1} END {print sum}') - if (((initial_free_space - current_free) >= (initial_free_space / $space_drop_fraction))); then - return 1; - else - return 0; - fi +# Extract the value passed after --s3-local-dir +S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"') +# Extract the value passed after --lucene-dir +LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"') +if [[ -n "$S3_LOCAL_DIR" ]]; then + echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR" + rm -rf "$S3_LOCAL_DIR" + echo "Directory $S3_LOCAL_DIR has been deleted." +else + echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs." +fi + +if [[ -n "$LUCENE_DIR" ]]; then + echo "Will delete lucene local directory between runs: $LUCENE_DIR" +else + echo "--lucene-dir argument not found in RFS_COMMAND. This is required." + exit 1 +fi + +cleanup_directories() { + if [[ -n "$S3_LOCAL_DIR" ]]; then + echo "Cleaning up S3 local directory: $S3_LOCAL_DIR" + rm -rf "$S3_LOCAL_DIR" + echo "Directory $S3_LOCAL_DIR has been cleaned up." + fi + + if [[ -n "$LUCENE_DIR" ]]; then + echo "Cleaning up Lucene local directory: $LUCENE_DIR" + rm -rf "$LUCENE_DIR" + echo "Directory $LUCENE_DIR has been cleaned up." + fi } + [ -z "$RFS_COMMAND" ] && \ { echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \ -until ! { echo "Running command $RFS_COMMAND"; eval "$RFS_COMMAND" && still_enough_space_left; }; do :; done +until ! { + echo "Running command $RFS_COMMAND" + eval "$RFS_COMMAND" +}; do + echo "Cleaning up directories before the next run." + cleanup_directories +done From a1acdc1b901f55c842afdb496a9102527051998d Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Mon, 7 Oct 2024 16:20:53 -0500 Subject: [PATCH 5/5] Remove s3 pre-delete Signed-off-by: Andre Kurait --- DocumentsFromSnapshotMigration/docker/entrypoint.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/DocumentsFromSnapshotMigration/docker/entrypoint.sh b/DocumentsFromSnapshotMigration/docker/entrypoint.sh index 94ab05cfb..407277328 100755 --- a/DocumentsFromSnapshotMigration/docker/entrypoint.sh +++ b/DocumentsFromSnapshotMigration/docker/entrypoint.sh @@ -49,8 +49,6 @@ S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[ LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"') if [[ -n "$S3_LOCAL_DIR" ]]; then echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR" - rm -rf "$S3_LOCAL_DIR" - echo "Directory $S3_LOCAL_DIR has been deleted." else echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs." fi