Skip to content

Commit

Permalink
[Fetch Migration] Usability updates and bugfixes (#451)
Browse files Browse the repository at this point in the history
This change includes a few usability improvements and some bugfixes:

* (Bugfix) fetch_orchestrator.py skips the check for a Data Prepper pipeline file if an INLINE_PIPELINE value is present
* The Fetch Migration Dockerfile entrypoint has been updated to use the exec form. This allows command line arguments (such as flags) to be passed to it
* A wrapper script (showFetchMigrationCommand.sh) has been added which provides an easy way to supply flags (such as --create-only or --dryrun) to the ECS task. The script does not run the AWS command - it only prints it for the user to copy and run themselves
* The messaging in the metadata migration report has been tweaked to decrease confusion when performing a run where data isn't migrated
* Added handling for scenarios where the target document count is zero

---------

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg authored Nov 17, 2023
1 parent b41128e commit c4eb40e
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 9 deletions.
3 changes: 1 addition & 2 deletions FetchMigration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,4 @@ RUN echo "ssl: false" > $DATA_PREPPER_PATH/config/data-prepper-config.yaml
RUN echo "metricRegistries: [Prometheus]" >> $DATA_PREPPER_PATH/config/data-prepper-config.yaml

# Include the -u flag to have unbuffered stdout (for detached mode)
# "$@" allows passing extra args/flags to the entrypoint when the image is run
ENTRYPOINT python3 -u ./fetch_orchestrator.py --insecure $DATA_PREPPER_PATH $FM_CODE_PATH/input.yaml "$@"
ENTRYPOINT ["python3", "-u", "./fetch_orchestrator.py", "--insecure", "$DATA_PREPPER_PATH", "$FM_CODE_PATH/input.yaml"]
7 changes: 5 additions & 2 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def run(params: FetchOrchestratorParams) -> Optional[int]:
report=True, dryrun=params.is_dry_run)
logging.info("Running metadata migration...\n")
metadata_migration_result = metadata_migration.run(metadata_migration_params)
if len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration():
if metadata_migration_result.target_doc_count == 0:
logging.warning("Target document count is zero, skipping data migration...")
elif len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration():
# Kick off a subprocess for Data Prepper
logging.info("Running Data Prepper...\n")
proc = subprocess.Popen(dp_exec_path)
Expand Down Expand Up @@ -143,7 +145,8 @@ def run(params: FetchOrchestratorParams) -> Optional[int]:
elif not os.path.exists(dp_path + __DP_EXECUTABLE_SUFFIX):
raise ValueError(f"Could not find {__DP_EXECUTABLE_SUFFIX} executable under Data Prepper install location")
pipeline_file = os.path.expandvars(cli_args.pipeline_file_path)
if not os.path.exists(pipeline_file):
# Raise error if pipeline file is neither on disk nor provided inline
if __get_env_string("INLINE_PIPELINE") is None and not os.path.exists(pipeline_file):
raise ValueError("Data Prepper pipeline file does not exist")
params = FetchOrchestratorParams(dp_path, pipeline_file, port=cli_args.port, insecure=cli_args.insecure,
dryrun=cli_args.dryrun, create_only=cli_args.createonly)
Expand Down
8 changes: 4 additions & 4 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ def write_output(yaml_data: dict, indices_to_migrate: set, output_path: str):

def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover
logging.info("Identical indices in the target cluster: " + utils.string_from_set(diff.identical_indices))
logging.info("Identical empty indices in the target cluster (data will be migrated): " +
logging.info("Identical empty indices in the target cluster (will be migrated): " +
utils.string_from_set(diff.identical_empty_indices))
logging.info("Indices present in both clusters with conflicting settings/mappings (data will not be migrated): " +
logging.info("Indices present in both clusters with conflicting settings/mappings (will NOT be migrated): " +
utils.string_from_set(diff.conflicting_indices))
logging.info("Indices to be created in the target cluster (data will be migrated): " +
logging.info("Indices to be created in the target cluster (will be migrated): " +
utils.string_from_set(diff.indices_to_create))
logging.info("Total number of documents to be moved: " + str(total_doc_count))
logging.info("Target document count: " + str(total_doc_count))


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
Expand Down
7 changes: 7 additions & 0 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#

import copy
import logging
import os
import unittest
from unittest import mock
Expand All @@ -23,6 +24,12 @@


class TestFetchOrchestrator(unittest.TestCase):
# Run before each test
def setUp(self) -> None:
logging.disable(logging.CRITICAL)

def tearDown(self) -> None:
logging.disable(logging.NOTSET)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ RUN mkdir /root/kafka-tools/aws
COPY runTestBenchmarks.sh /root/
COPY humanReadableLogs.py /root/
COPY catIndices.sh /root/
COPY showFetchMigrationCommand.sh /root/
COPY msk-iam-auth.properties /root/kafka-tools/aws
COPY kafkaCmdRef.md /root/kafka-tools
RUN chmod ug+x /root/runTestBenchmarks.sh
RUN chmod ug+x /root/humanReadableLogs.py
RUN chmod ug+x /root/catIndices.sh
RUN chmod ug+x /root/showFetchMigrationCommand.sh
WORKDIR /root/kafka-tools
# Get kafka distribution and unpack to 'kafka'
RUN wget -qO- https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz | tar --transform 's!^[^/]*!kafka!' -xvz
RUN wget -O kafka/libs/msk-iam-auth.jar https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.9/aws-msk-iam-auth-1.1.9-all.jar
WORKDIR /root

CMD tail -f /dev/null
CMD tail -f /dev/null
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

# Ensure Fetch Migration command is available before proceeding
if [ -z "$FETCH_MIGRATION_COMMAND" ]; then
echo "Fetch Migration unavailable or not deployed, exiting..."
exit 1
fi

# ECS command overrides argument with placeholder for flags
OVERRIDES_ARG="--overrides '{ \"containerOverrides\": [ { \"name\": \"fetch-migration\", \"command\": <FLAGS> }]}'"

# Default values
create_only=false
dry_run=false

while [[ $# -gt 0 ]]; do
key="$1"
case $key in
--create-only)
create_only=true
shift
;;
--dryrun)
dry_run=true
shift
;;
*)
shift
;;
esac
done

# Build flags string
flags=""
if [ "$dry_run" = true ]; then
flags="--dryrun,${flags}"
fi
if [ "$create_only" = true ]; then
flags="--createonly,${flags}"
fi

command_to_run="$FETCH_MIGRATION_COMMAND"
# Only add overrides suffix if any flags were specified
if [ -n "$flags" ]; then
# Remove trailing ,
flags=${flags%?}
# Convert to JSON array string
flags=$(echo -n $flags | jq -cRs 'split("\n")')
# Replace placeholder value in overrides string with actual flags
command_to_run="$command_to_run ${OVERRIDES_ARG/<FLAGS>/$flags}"
fi

echo $command_to_run

0 comments on commit c4eb40e

Please sign in to comment.