diff --git a/FetchMigration/Dockerfile b/FetchMigration/Dockerfile index e823d55f2..e1e97edac 100644 --- a/FetchMigration/Dockerfile +++ b/FetchMigration/Dockerfile @@ -19,5 +19,4 @@ RUN echo "ssl: false" > $DATA_PREPPER_PATH/config/data-prepper-config.yaml RUN echo "metricRegistries: [Prometheus]" >> $DATA_PREPPER_PATH/config/data-prepper-config.yaml # Include the -u flag to have unbuffered stdout (for detached mode) -# "$@" allows passing extra args/flags to the entrypoint when the image is run -ENTRYPOINT python3 -u ./fetch_orchestrator.py --insecure $DATA_PREPPER_PATH $FM_CODE_PATH/input.yaml "$@" +ENTRYPOINT ["python3", "-u", "./fetch_orchestrator.py", "--insecure", "$DATA_PREPPER_PATH", "$FM_CODE_PATH/input.yaml"] diff --git a/FetchMigration/python/fetch_orchestrator.py b/FetchMigration/python/fetch_orchestrator.py index d671cce25..ef165e4f0 100644 --- a/FetchMigration/python/fetch_orchestrator.py +++ b/FetchMigration/python/fetch_orchestrator.py @@ -93,7 +93,9 @@ def run(params: FetchOrchestratorParams) -> Optional[int]: report=True, dryrun=params.is_dry_run) logging.info("Running metadata migration...\n") metadata_migration_result = metadata_migration.run(metadata_migration_params) - if len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration(): + if metadata_migration_result.target_doc_count == 0: + logging.warning("Target document count is zero, skipping data migration...") + elif len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration(): # Kick off a subprocess for Data Prepper logging.info("Running Data Prepper...\n") proc = subprocess.Popen(dp_exec_path) @@ -143,7 +145,8 @@ def run(params: FetchOrchestratorParams) -> Optional[int]: elif not os.path.exists(dp_path + __DP_EXECUTABLE_SUFFIX): raise ValueError(f"Could not find {__DP_EXECUTABLE_SUFFIX} executable under Data Prepper install location") pipeline_file = os.path.expandvars(cli_args.pipeline_file_path) - if not os.path.exists(pipeline_file): + # Raise error if pipeline file is neither on disk nor provided inline + if __get_env_string("INLINE_PIPELINE") is None and not os.path.exists(pipeline_file): raise ValueError("Data Prepper pipeline file does not exist") params = FetchOrchestratorParams(dp_path, pipeline_file, port=cli_args.port, insecure=cli_args.insecure, dryrun=cli_args.dryrun, create_only=cli_args.createonly) diff --git a/FetchMigration/python/metadata_migration.py b/FetchMigration/python/metadata_migration.py index 48a8dd6ca..aaff51cc9 100644 --- a/FetchMigration/python/metadata_migration.py +++ b/FetchMigration/python/metadata_migration.py @@ -41,13 +41,13 @@ def write_output(yaml_data: dict, indices_to_migrate: set, output_path: str): def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover logging.info("Identical indices in the target cluster: " + utils.string_from_set(diff.identical_indices)) - logging.info("Identical empty indices in the target cluster (data will be migrated): " + + logging.info("Identical empty indices in the target cluster (will be migrated): " + utils.string_from_set(diff.identical_empty_indices)) - logging.info("Indices present in both clusters with conflicting settings/mappings (data will not be migrated): " + + logging.info("Indices present in both clusters with conflicting settings/mappings (will NOT be migrated): " + utils.string_from_set(diff.conflicting_indices)) - logging.info("Indices to be created in the target cluster (data will be migrated): " + + logging.info("Indices to be created in the target cluster (will be migrated): " + utils.string_from_set(diff.indices_to_create)) - logging.info("Total number of documents to be moved: " + str(total_doc_count)) + logging.info("Target document count: " + str(total_doc_count)) def run(args: MetadataMigrationParams) -> MetadataMigrationResult: diff --git a/FetchMigration/python/tests/test_fetch_orchestrator.py b/FetchMigration/python/tests/test_fetch_orchestrator.py index a837bea4e..84ef7517e 100644 --- a/FetchMigration/python/tests/test_fetch_orchestrator.py +++ b/FetchMigration/python/tests/test_fetch_orchestrator.py @@ -8,6 +8,7 @@ # import copy +import logging import os import unittest from unittest import mock @@ -23,6 +24,12 @@ class TestFetchOrchestrator(unittest.TestCase): + # Run before each test + def setUp(self) -> None: + logging.disable(logging.CRITICAL) + + def tearDown(self) -> None: + logging.disable(logging.NOTSET) @patch('migration_monitor.run') @patch('subprocess.Popen') diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile index d29bd9e4d..9518cb2a1 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/Dockerfile @@ -12,15 +12,17 @@ RUN mkdir /root/kafka-tools/aws COPY runTestBenchmarks.sh /root/ COPY humanReadableLogs.py /root/ COPY catIndices.sh /root/ +COPY showFetchMigrationCommand.sh /root/ COPY msk-iam-auth.properties /root/kafka-tools/aws COPY kafkaCmdRef.md /root/kafka-tools RUN chmod ug+x /root/runTestBenchmarks.sh RUN chmod ug+x /root/humanReadableLogs.py RUN chmod ug+x /root/catIndices.sh +RUN chmod ug+x /root/showFetchMigrationCommand.sh WORKDIR /root/kafka-tools # Get kafka distribution and unpack to 'kafka' RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz RUN wget -O kafka/libs/msk-iam-auth.jar https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.9/aws-msk-iam-auth-1.1.9-all.jar WORKDIR /root -CMD tail -f /dev/null \ No newline at end of file +CMD tail -f /dev/null diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh new file mode 100755 index 000000000..3c34ae118 --- /dev/null +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/showFetchMigrationCommand.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# Ensure Fetch Migration command is available before proceeding +if [ -z "$FETCH_MIGRATION_COMMAND" ]; then + echo "Fetch Migration unavailable or not deployed, exiting..." + exit 1 +fi + +# ECS command overrides argument with placeholder for flags +OVERRIDES_ARG="--overrides '{ \"containerOverrides\": [ { \"name\": \"fetch-migration\", \"command\": }]}'" + +# Default values +create_only=false +dry_run=false + +while [[ $# -gt 0 ]]; do + key="$1" + case $key in + --create-only) + create_only=true + shift + ;; + --dryrun) + dry_run=true + shift + ;; + *) + shift + ;; + esac +done + +# Build flags string +flags="" +if [ "$dry_run" = true ]; then + flags="--dryrun,${flags}" +fi +if [ "$create_only" = true ]; then + flags="--createonly,${flags}" +fi + +command_to_run="$FETCH_MIGRATION_COMMAND" +# Only add overrides suffix if any flags were specified +if [ -n "$flags" ]; then + # Remove trailing , + flags=${flags%?} + # Convert to JSON array string + flags=$(echo -n $flags | jq -cRs 'split("\n")') + # Replace placeholder value in overrides string with actual flags + command_to_run="$command_to_run ${OVERRIDES_ARG//$flags}" +fi + +echo $command_to_run