Skip to content

Commit

Permalink
Merge branch 'main' into otel-instrumentation
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <[email protected]>
  • Loading branch information
mikaylathompson committed Nov 13, 2023
2 parents aeec6f8 + bce4aed commit 3414f55
Show file tree
Hide file tree
Showing 199 changed files with 3,892 additions and 2,028 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle-build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
working-directory: TrafficCapture

- name: Run Tests with Coverage
run: ./gradlew allTests jacocoTestReport --info
run: ./gradlew test :trafficReplayer:longTest jacocoTestReport --info
working-directory: TrafficCapture

- name: Upload to Codecov
Expand Down
5 changes: 3 additions & 2 deletions FetchMigration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ ENV PATH=/root/.local:$PATH
RUN echo "ssl: false" > $DATA_PREPPER_PATH/config/data-prepper-config.yaml
RUN echo "metricRegistries: [Prometheus]" >> $DATA_PREPPER_PATH/config/data-prepper-config.yaml

# Include the -u flag to have stdout logged
ENTRYPOINT python3 -u ./fetch_orchestrator.py $DATA_PREPPER_PATH $FM_CODE_PATH/input.yaml http://localhost:4900
# Include the -u flag to have unbuffered stdout (for detached mode)
# "$@" allows passing extra args/flags to the entrypoint when the image is run
ENTRYPOINT python3 -u ./fetch_orchestrator.py --insecure $DATA_PREPPER_PATH $FM_CODE_PATH/input.yaml "$@"
5 changes: 4 additions & 1 deletion FetchMigration/python/dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
coverage>=7.3.2
pur>=7.3.1
pur>=7.3.1
moto>=4.2.7
# Transitive dependency from moto, explicit version needed to mitigate CVE-2023-46136
werkzeug>=3.0.1
43 changes: 38 additions & 5 deletions FetchMigration/python/endpoint_info.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,41 @@
from dataclasses import dataclass
from typing import Union

from requests_aws4auth import AWS4Auth

@dataclass

# Class that encapsulates endpoint information for an OpenSearch/Elasticsearch cluster
class EndpointInfo:
url: str
auth: tuple = None
verify_ssl: bool = True
# Private member variables
__url: str
# "|" operator is only supported in 3.10+
__auth: Union[AWS4Auth, tuple, None]
__verify_ssl: bool

def __init__(self, url: str, auth: Union[AWS4Auth, tuple, None] = None, verify_ssl: bool = True):
self.__url = url
# Normalize url value to have trailing slash
if not url.endswith("/"):
self.__url += "/"
self.__auth = auth
self.__verify_ssl = verify_ssl

def __eq__(self, obj):
return isinstance(obj, EndpointInfo) and \
self.__url == obj.__url and \
self.__auth == obj.__auth and \
self.__verify_ssl == obj.__verify_ssl

def add_path(self, path: str) -> str:
# Remove leading slash if present
if path.startswith("/"):
path = path[1:]
return self.__url + path

def get_url(self) -> str:
return self.__url

def get_auth(self) -> Union[AWS4Auth, tuple, None]:
return self.__auth

def is_verify_ssl(self) -> bool:
return self.__verify_ssl
156 changes: 156 additions & 0 deletions FetchMigration/python/endpoint_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import re
from typing import Optional, Union

from requests_aws4auth import AWS4Auth
from botocore.session import Session

from endpoint_info import EndpointInfo

# Constants
SOURCE_KEY = "source"
SINK_KEY = "sink"
SUPPORTED_PLUGINS = ["opensearch", "elasticsearch"]
HOSTS_KEY = "hosts"
INSECURE_KEY = "insecure"
CONNECTION_KEY = "connection"
DISABLE_AUTH_KEY = "disable_authentication"
USER_KEY = "username"
PWD_KEY = "password"
AWS_SIGV4_KEY = "aws_sigv4"
AWS_REGION_KEY = "aws_region"
AWS_CONFIG_KEY = "aws"
AWS_CONFIG_REGION_KEY = "region"
IS_SERVERLESS_KEY = "serverless"
ES_SERVICE_NAME = "es"
AOSS_SERVICE_NAME = "aoss"
URL_REGION_PATTERN = re.compile(r"([\w-]*)\.(es|aoss)\.amazonaws\.com")


def __get_url(plugin_config: dict) -> str:
# "hosts" can be a simple string, or an array of hosts for Logstash to hit.
# This tool needs one accessible host, so we pick the first entry in the latter case.
return plugin_config[HOSTS_KEY][0] if isinstance(plugin_config[HOSTS_KEY], list) else plugin_config[HOSTS_KEY]


# Helper function that attempts to extract the AWS region from a URL,
# assuming it is of the form *.<region>.<service>.amazonaws.com
def __derive_aws_region_from_url(url: str) -> Optional[str]:
match = URL_REGION_PATTERN.search(url)
if match:
# Index 0 returns the entire match, index 1 returns only the first group
return match.group(1)
return None


def get_aws_region(plugin_config: dict) -> str:
if plugin_config.get(AWS_SIGV4_KEY, False) and plugin_config.get(AWS_REGION_KEY, None) is not None:
return plugin_config[AWS_REGION_KEY]
elif plugin_config.get(AWS_CONFIG_KEY, None) is not None:
aws_config = plugin_config[AWS_CONFIG_KEY]
if not isinstance(aws_config, dict):
raise ValueError("Unexpected value for 'aws' configuration")
elif aws_config.get(AWS_CONFIG_REGION_KEY, None) is not None:
return aws_config[AWS_CONFIG_REGION_KEY]
# Region not explicitly defined, attempt to derive from URL
derived_region = __derive_aws_region_from_url(__get_url(plugin_config))
if derived_region is None:
raise ValueError("No region configured for AWS SigV4 auth, or derivable from host URL")
return derived_region


def __check_supported_endpoint(section_config: dict) -> Optional[tuple]:
for supported_type in SUPPORTED_PLUGINS:
if supported_type in section_config:
return supported_type, section_config[supported_type]


# This config key may be either directly in the main dict (for sink)
# or inside a nested dict (for source). The default value is False.
def is_insecure(plugin_config: dict) -> bool:
if INSECURE_KEY in plugin_config:
return plugin_config[INSECURE_KEY]
elif CONNECTION_KEY in plugin_config and INSECURE_KEY in plugin_config[CONNECTION_KEY]:
return plugin_config[CONNECTION_KEY][INSECURE_KEY]
return False


def validate_pipeline(pipeline: dict):
if SOURCE_KEY not in pipeline:
raise ValueError("Missing source configuration in Data Prepper pipeline YAML")
if SINK_KEY not in pipeline:
raise ValueError("Missing sink configuration in Data Prepper pipeline YAML")


def validate_auth(plugin_name: str, plugin_config: dict):
# If auth is disabled, no further validation is required
if plugin_config.get(DISABLE_AUTH_KEY, False):
return
# If AWS SigV4 is configured, validate region
if plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config:
# Raises a ValueError if region cannot be derived
get_aws_region(plugin_config)
return
# Validate basic auth
elif USER_KEY not in plugin_config:
raise ValueError("Invalid auth configuration (no username) for plugin: " + plugin_name)
elif PWD_KEY not in plugin_config:
raise ValueError("Invalid auth configuration (no password for username) for plugin: " + plugin_name)


def get_supported_endpoint_config(pipeline_config: dict, section_key: str) -> tuple:
# The value of each key may be a single plugin (as a dict) or a list of plugin configs
supported_tuple = tuple()
if isinstance(pipeline_config[section_key], dict):
supported_tuple = __check_supported_endpoint(pipeline_config[section_key])
elif isinstance(pipeline_config[section_key], list):
for entry in pipeline_config[section_key]:
supported_tuple = __check_supported_endpoint(entry)
# Break out of the loop at the first supported type
if supported_tuple:
break
if not supported_tuple:
raise ValueError("Could not find any supported endpoints in section: " + section_key)
# First tuple value is the plugin name, second value is the plugin config dict
return supported_tuple[0], supported_tuple[1]


def get_aws_sigv4_auth(region: str, is_serverless: bool = False) -> AWS4Auth:
credentials = Session().get_credentials()
if not credentials:
raise ValueError("Unable to fetch AWS session credentials for SigV4 auth")
if is_serverless:
return AWS4Auth(region=region, service=AOSS_SERVICE_NAME, refreshable_credentials=credentials)
else:
return AWS4Auth(region=region, service=ES_SERVICE_NAME, refreshable_credentials=credentials)


def get_auth(plugin_config: dict) -> Union[AWS4Auth, tuple, None]:
# Basic auth
if USER_KEY in plugin_config and PWD_KEY in plugin_config:
return plugin_config[USER_KEY], plugin_config[PWD_KEY]
elif plugin_config.get(AWS_SIGV4_KEY, False) or AWS_CONFIG_KEY in plugin_config:
is_serverless = False
# OpenSearch Serverless uses a different service name
if AWS_CONFIG_KEY in plugin_config:
aws_config = plugin_config[AWS_CONFIG_KEY]
if isinstance(aws_config, dict) and aws_config.get(IS_SERVERLESS_KEY, False):
is_serverless = True
region = get_aws_region(plugin_config)
return get_aws_sigv4_auth(region, is_serverless)
return None


def get_endpoint_info_from_plugin_config(plugin_config: dict) -> EndpointInfo:
# verify boolean will be the inverse of the insecure SSL key, if present
should_verify = not is_insecure(plugin_config)
return EndpointInfo(__get_url(plugin_config), get_auth(plugin_config), should_verify)


def get_endpoint_info_from_pipeline_config(pipeline_config: dict, section_key: str) -> EndpointInfo:
# Raises a ValueError if no supported endpoints are found
plugin_name, plugin_config = get_supported_endpoint_config(pipeline_config, section_key)
if HOSTS_KEY not in plugin_config:
raise ValueError("No hosts defined for plugin: " + plugin_name)
# Raises a ValueError if there an error in the auth configuration
validate_auth(plugin_name, plugin_config)
return get_endpoint_info_from_plugin_config(plugin_config)
107 changes: 89 additions & 18 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,98 @@
import base64
import logging
import os
import re
import subprocess
import sys
from typing import Optional

import yaml

import endpoint_utils
import metadata_migration
import migration_monitor
from fetch_orchestrator_params import FetchOrchestratorParams
from metadata_migration_params import MetadataMigrationParams
from migration_monitor_params import MigrationMonitorParams

__PROTOCOL_PREFIX_PATTERN = re.compile(r"^https?://")
__HTTPS_PREFIX = "https://"
__DP_EXECUTABLE_SUFFIX = "/bin/data-prepper"
__PIPELINE_OUTPUT_FILE_SUFFIX = "/pipelines/pipeline.yaml"


def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[int]:
dp_exec_path = dp_base_path + __DP_EXECUTABLE_SUFFIX
output_file = dp_base_path + __PIPELINE_OUTPUT_FILE_SUFFIX
metadata_migration_params = MetadataMigrationParams(dp_config_file, output_file, report=True)
logging.info("Running pre-migration steps...\n")
def __get_env_string(name: str) -> Optional[str]:
val: str = os.environ.get(name, "")
if len(val) > 0:
return val
else:
return None


def update_target_host(dp_config: dict, target_host: str):
# Inline target host only supports HTTPS, so force it
target_with_protocol = target_host
match = __PROTOCOL_PREFIX_PATTERN.match(target_with_protocol)
if match:
target_with_protocol = target_host[match.end():]
target_with_protocol = __HTTPS_PREFIX + target_with_protocol
if len(dp_config) > 0:
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
# The entire pipeline will be validated later
if endpoint_utils.SINK_KEY in pipeline_config:
# throws ValueError if no supported endpoints are found
plugin_name, plugin_config = endpoint_utils.get_supported_endpoint_config(pipeline_config,
endpoint_utils.SINK_KEY)
plugin_config[endpoint_utils.HOSTS_KEY] = [target_with_protocol]
pipeline_config[endpoint_utils.SINK_KEY] = [{plugin_name: plugin_config}]


def write_inline_pipeline(pipeline_file_path: str, inline_pipeline: str, inline_target_host: Optional[str]):
pipeline_yaml = yaml.safe_load(base64.b64decode(inline_pipeline))
if inline_target_host is not None:
update_target_host(pipeline_yaml, inline_target_host)
with open(pipeline_file_path, 'w') as out_file:
# Note - this does not preserve comments
yaml.safe_dump(pipeline_yaml, out_file)


def write_inline_target_host(pipeline_file_path: str, inline_target_host: str):
with open(pipeline_file_path, 'r+') as pipeline_file:
pipeline_yaml = yaml.safe_load(pipeline_file)
update_target_host(pipeline_yaml, inline_target_host)
# Note - this does not preserve comments
yaml.safe_dump(pipeline_yaml, pipeline_file)


def run(params: FetchOrchestratorParams) -> Optional[int]:
# This is expected to be a base64 encoded string
inline_pipeline = __get_env_string("INLINE_PIPELINE")
inline_target_host = __get_env_string("INLINE_TARGET_HOST")
if inline_pipeline is not None:
write_inline_pipeline(params.pipeline_file_path, inline_pipeline, inline_target_host)
elif inline_target_host is not None:
write_inline_target_host(params.pipeline_file_path, inline_target_host)
dp_exec_path = params.data_prepper_path + __DP_EXECUTABLE_SUFFIX
output_file = params.data_prepper_path + __PIPELINE_OUTPUT_FILE_SUFFIX
if params.is_dry_run:
logging.info("Dry-run flag enabled, no actual changes will be made\n")
elif params.is_create_only:
logging.info("Create-only flag enabled, will only perform metadata migration\n")
metadata_migration_params = MetadataMigrationParams(params.pipeline_file_path, output_file,
report=True, dryrun=params.is_dry_run)
logging.info("Running metadata migration...\n")
metadata_migration_result = metadata_migration.run(metadata_migration_params)
if len(metadata_migration_result.created_indices) > 0:
if len(metadata_migration_result.migration_indices) > 0 and not params.is_only_metadata_migration():
# Kick off a subprocess for Data Prepper
logging.info("Running Data Prepper...\n")
proc = subprocess.Popen(dp_exec_path)
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count,
params.get_local_endpoint())
logging.info("Starting migration monitor...\n")
return migration_monitor.run(migration_monitor_params, proc)
logging.info("Fetch Migration workflow concluded\n")


if __name__ == '__main__': # pragma no cover
Expand All @@ -46,22 +111,28 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in
help="Path to the base directory where Data Prepper is installed "
)
arg_parser.add_argument(
"config_file_path",
"pipeline_file_path",
help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information"
)
# Optional positional argument
arg_parser.add_argument(
"data_prepper_endpoint",
help="Data Prepper endpoint for monitoring the migration"
"port", type=int,
nargs='?', default=4900,
help="Local port at which the Data Prepper process will expose its APIs"
)
# Flags
arg_parser.add_argument("--insecure", "-k", action="store_true",
help="Specifies that the local Data Prepper process is not using SSL")
arg_parser.add_argument("--dryrun", action="store_true",
help="Performs a dry-run. Only a report is printed - no indices are created or migrated")
arg_parser.add_argument("--createonly", "-c", action="store_true",
help="Skips data migration and only creates indices on the target cluster")
cli_args = arg_parser.parse_args()
base_path = os.path.expandvars(cli_args.data_prepper_path)

inline_pipeline = os.environ.get("INLINE_PIPELINE", None)
if inline_pipeline is not None:
decoded_bytes = base64.b64decode(inline_pipeline)
with open(cli_args.config_file_path, 'wb') as config_file:
config_file.write(decoded_bytes)
return_code = run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
params = FetchOrchestratorParams(os.path.expandvars(cli_args.data_prepper_path),
os.path.expandvars(cli_args.pipeline_file_path),
port=cli_args.port, insecure=cli_args.insecure,
dryrun=cli_args.dryrun, create_only=cli_args.createonly)
return_code = run(params)
if return_code == 0:
sys.exit(0)
else:
Expand Down
Loading

0 comments on commit 3414f55

Please sign in to comment.