Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add Basic Support for OpenSearch Ingestion controls from the Migration Console #621

Merged
merged 12 commits into from
May 1, 2024
Merged
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504 - Line break occurred after a binary operator
ignore = E265,E402,E999,W293,W504
max-line-length = 120
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv
exclude = .tox,.git,*/migrations/*,*/static/CACHE/*,docs,node_modules,.venv,*/cdk.out/*

# F401 - Unused imports -- this is the only way to have a file-wide rule exception
per-file-ignores =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ FROM ubuntu:jammy

ENV DEBIAN_FRONTEND noninteractive

COPY lib /root/lib

RUN apt-get update && \
apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev openjdk-11-jre-headless wget gcc libc-dev git curl vim jq unzip less && \
pip3 install urllib3 opensearch-benchmark==1.2.0 awscurl tqdm awscli
pip3 install urllib3 opensearch-benchmark==1.2.0 awscurl tqdm awscli -r /root/lib/osiMigrationLib/requirements.txt
RUN mkdir /root/kafka-tools
RUN mkdir /root/kafka-tools/aws

Expand All @@ -25,6 +27,8 @@ COPY simpleDocumentGenerator.py /root/
COPY catIndices.sh /root/
COPY showFetchMigrationCommand.sh /root/
COPY setupIntegTests.sh /root/
COPY osiMigration.py /root/
COPY osiPipelineTemplate.yaml /root/
COPY msk-iam-auth.properties /root/kafka-tools/aws
COPY kafkaCmdRef.md /root/kafka-tools
COPY kafkaExport.sh /root/kafka-tools
Expand All @@ -34,6 +38,7 @@ RUN chmod ug+x /root/humanReadableLogs.py
RUN chmod ug+x /root/simpleDocumentGenerator.py
RUN chmod ug+x /root/catIndices.sh
RUN chmod ug+x /root/showFetchMigrationCommand.sh
RUN chmod ug+x /root/osiMigration.py
RUN chmod ug+x /root/kafka-tools/kafkaExport.sh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, we should probably start doing all these chmods in one line so we create fewer layers (and maybe the copies too), but I don't think that's an issue for this moment


CMD tail -f /dev/null
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
coloredlogs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: "2"
# NOTE: Placeholder values will be automatically populated and do not need to be changed
<AWS_SECRET_CONFIG_PLACEHOLDER>

historical-data-migration:

# Source cluster configuration
source:
opensearch:
hosts:
- <SOURCE_CLUSTER_ENDPOINT_PLACEHOLDER>
indices:
# Indices to exclude - exclude system indices by default
exclude:
- index_name_regex: \.*
<SOURCE_AUTH_OPTIONS_PLACEHOLDER>

# Target cluster configuration
sink:
- opensearch:
hosts:
- <TARGET_CLUSTER_ENDPOINT_PLACEHOLDER>
# Derive index name from record metadata
index: ${getMetadata("opensearch-index")}
# Use the same document ID as the source cluster document
document_id: ${getMetadata("opensearch-document_id")}
<TARGET_AUTH_OPTIONS_PLACEHOLDER>
25 changes: 25 additions & 0 deletions deployment/cdk/opensearch-service-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,31 @@ With the [required setup](#importing-target-clusters) on the target cluster havi
The pipeline configuration file can be viewed (and updated) via AWS Secrets Manager.
Please note that it will be base64 encoded.

## Kicking off OpenSearch Ingestion Service

**Note**: Using OpenSearch Ingestion Service is currently an experimental feature that must be enabled with the `migrationConsoleEnableOSI` option. Currently only Managed OpenSearch service as a source to Managed OpenSearch service as a target migrations are supported

After enabling and deploying the CDK, log into the Migration Console
```shell
# ./accessContainer.sh migration-console STAGE REGION
./accessContainer.sh migration-console dev us-east-1
```
Make any modifications to the `osiPipelineTemplate.yaml` on the Migration Console, if needed. Note: Placeholder values exist in the file to automatically populate source/target endpoints and corresponding auth options by the python tool that uses this yaml file.

The OpenSearch Ingestion pipeline can then be created by giving an existing source cluster endpoint and running the below command
```shell
./osiMigration.py create-pipeline-from-solution --source-endpoint=<SOURCE_ENDPOINT>
```

When OpenSearch Ingestion pipelines are created they begin running immediately and can be stopped with the following command
```shell
./osiMigration.py stop-pipeline
```
Or restarted with the following command
```shell
./osiMigration.py start-pipeline
```

## Kicking off Reindex from Snapshot (RFS)

When the RFS service gets deployed, it does not start running immediately. Instead, the user controls when they want to kick off a historical data migration.
Expand Down
1 change: 1 addition & 0 deletions deployment/cdk/opensearch-service-migration/bin/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ const customReplayerUserAgent = process.env.CUSTOM_REPLAYER_USER_AGENT
new StackComposer(app, {
migrationsAppRegistryARN: migrationsAppRegistryARN,
customReplayerUserAgent: customReplayerUserAgent,
migrationsSolutionVersion: version,
env: { account: account, region: region }
});
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export class MSKUtilityStack extends Stack {
const lambdaInvokeStatement = new PolicyStatement({
effect: Effect.ALLOW,
actions: ["lambda:InvokeFunction"],
resources: [`arn:aws:lambda:${props.env?.region}:${props.env?.account}:function:OSMigrations*`]
resources: [`arn:aws:lambda:${this.region}:${this.account}:function:OSMigrations*`]
})
// Updating connectivity for an MSK cluster requires some VPC permissions
// (https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazonmanagedstreamingforapachekafka.html#amazonmanagedstreamingforapachekafka-cluster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ import {Construct} from "constructs";
import {join} from "path";
import {MigrationServiceCore} from "./migration-service-core";
import {StringParameter} from "aws-cdk-lib/aws-ssm";
import {Effect, PolicyStatement} from "aws-cdk-lib/aws-iam";
import {Effect, PolicyStatement, Role, ServicePrincipal} from "aws-cdk-lib/aws-iam";
import {
createOpenSearchIAMAccessPolicy,
createOpenSearchServerlessIAMAccessPolicy
} from "../common-utilities";
import {StreamingSourceType} from "../streaming-source-type";
import {LogGroup, RetentionDays} from "aws-cdk-lib/aws-logs";
import {RemovalPolicy} from "aws-cdk-lib";


export interface MigrationConsoleProps extends StackPropsExt {
readonly migrationsSolutionVersion: string,
readonly vpc: IVpc,
readonly streamingSourceType: StreamingSourceType,
readonly fetchMigrationEnabled: boolean,
readonly fargateCpuArch: CpuArchitecture,
readonly otelCollectorEnabled: boolean
readonly otelCollectorEnabled: boolean,
readonly migrationConsoleEnableOSI: boolean
}

export class MigrationConsoleStack extends MigrationServiceCore {
Expand Down Expand Up @@ -52,6 +56,45 @@ export class MigrationConsoleStack extends MigrationServiceCore {
return [mskClusterAdminPolicy, mskTopicAdminPolicy, mskConsumerGroupAdminPolicy]
}

configureOpenSearchIngestionPipelineRole(stage: string, deployId: string) {
const osiPipelineRole = new Role(this, 'osisPipelineRole', {
assumedBy: new ServicePrincipal('osis-pipelines.amazonaws.com'),
description: 'OpenSearch Ingestion Pipeline role for OpenSearch Migrations'
});
// Add policy to allow access to Opensearch domains
osiPipelineRole.addToPolicy(new PolicyStatement({
effect: Effect.ALLOW,
actions: ["es:DescribeDomain", "es:ESHttp*"],
resources: [`arn:aws:es:${this.region}:${this.account}:domain/*`]
}))

new StringParameter(this, 'SSMParameterOSIPipelineRoleArn', {
description: 'OpenSearch Migration Parameter for OpenSearch Ingestion Pipeline Role ARN',
parameterName: `/migration/${stage}/${deployId}/osiPipelineRoleArn`,
stringValue: osiPipelineRole.roleArn
});
return osiPipelineRole.roleArn
}

createOpenSearchIngestionManagementPolicy(pipelineRoleArn: string): PolicyStatement[] {
const allMigrationPipelineArn = `arn:aws:osis:${this.region}:${this.account}:pipeline/*`
const osiManagementPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [allMigrationPipelineArn],
actions: [
"osis:*"
]
})
const passPipelineRolePolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [pipelineRoleArn],
actions: [
"iam:PassRole"
]
})
return [osiManagementPolicy, passPipelineRolePolicy]
}

constructor(scope: Construct, id: string, props: MigrationConsoleProps) {
super(scope, id, props)
let securityGroups = [
Expand Down Expand Up @@ -81,7 +124,7 @@ export class MigrationConsoleStack extends MigrationServiceCore {
readOnly: false,
sourceVolume: volumeName
}
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${props.env?.region}:${props.env?.account}:file-system/${volumeId}`
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${this.region}:${this.account}:file-system/${volumeId}`
const replayerOutputMountPolicy = new PolicyStatement( {
effect: Effect.ALLOW,
resources: [replayerOutputEFSArn],
Expand All @@ -91,10 +134,10 @@ export class MigrationConsoleStack extends MigrationServiceCore {
]
})

const ecsClusterArn = `arn:aws:ecs:${props.env?.region}:${props.env?.account}:service/migration-${props.stage}-ecs-cluster`
const ecsClusterArn = `arn:aws:ecs:${this.region}:${this.account}:service/migration-${props.stage}-ecs-cluster`
const allReplayerServiceArn = `${ecsClusterArn}/migration-${props.stage}-traffic-replayer*`
const reindexFromSnapshotServiceArn = `${ecsClusterArn}/migration-${props.stage}-reindex-from-snapshot`
const updateReplayerServicePolicy = new PolicyStatement({
const ecsUpdateServicePolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [allReplayerServiceArn, reindexFromSnapshotServiceArn],
actions: [
Expand All @@ -112,14 +155,34 @@ export class MigrationConsoleStack extends MigrationServiceCore {
]
})

// Allow Console to determine proper subnets to use for any resource creation
const describeVPCPolicy = new PolicyStatement( {
effect: Effect.ALLOW,
resources: ["*"],
actions: [
"ec2:DescribeSubnets",
"ec2:DescribeRouteTables"
]
})

// Allow Console to retrieve SSM Parameters
const getSSMParamsPolicy = new PolicyStatement({
effect: Effect.ALLOW,
resources: [`arn:aws:ssm:${this.region}:${this.account}:parameter/migration/${props.stage}/${props.defaultDeployId}/*`],
actions: [
"ssm:GetParameters"
]
})

const environment: { [key: string]: string; } = {
"MIGRATION_DOMAIN_ENDPOINT": osClusterEndpoint,
"MIGRATION_KAFKA_BROKER_ENDPOINTS": brokerEndpoints,
"MIGRATION_STAGE": props.stage
"MIGRATION_STAGE": props.stage,
"MIGRATION_SOLUTION_VERSION": props.migrationsSolutionVersion
}
const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.region, this.account)
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.region, this.account)
let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, updateReplayerServicePolicy, artifactS3PublishPolicy]
let servicePolicies = [replayerOutputMountPolicy, openSearchPolicy, openSearchServerlessPolicy, ecsUpdateServicePolicy, artifactS3PublishPolicy, describeVPCPolicy, getSSMParamsPolicy]
if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
const mskAdminPolicies = this.createMSKAdminIAMPolicies(props.stage, props.defaultDeployId)
servicePolicies = servicePolicies.concat(mskAdminPolicies)
Expand Down Expand Up @@ -149,6 +212,21 @@ export class MigrationConsoleStack extends MigrationServiceCore {
servicePolicies.push(fetchMigrationPassRolePolicy)
}

if (props.migrationConsoleEnableOSI) {
const pipelineRoleArn = this.configureOpenSearchIngestionPipelineRole(props.stage, props.defaultDeployId)
servicePolicies.push(...this.createOpenSearchIngestionManagementPolicy(pipelineRoleArn))
const osiLogGroup = new LogGroup(this, 'OSILogGroup', {
retention: RetentionDays.ONE_MONTH,
removalPolicy: RemovalPolicy.DESTROY,
logGroupName: `/migration/${props.stage}/${props.defaultDeployId}/openSearchIngestion`
});
new StringParameter(this, 'SSMParameterOSIPipelineLogGroupName', {
description: 'OpenSearch Migration Parameter for OpenSearch Ingestion Pipeline Log Group Name',
parameterName: `/migration/${props.stage}/${props.defaultDeployId}/osiPipelineLogGroupName`,
stringValue: osiLogGroup.logGroupName
});
}

this.createService({
serviceName: "migration-console",
dockerDirectoryPath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/migrationConsole"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class MigrationServiceCore extends Stack {
const namespace = PrivateDnsNamespace.fromPrivateDnsNamespaceAttributes(this, "PrivateDNSNamespace", {
namespaceName: `migration.${props.stage}.local`,
namespaceId: namespaceId,
namespaceArn: `arn:aws:servicediscovery:${props.env?.region}:${props.env?.account}:namespace/${namespaceId}`
namespaceArn: `arn:aws:servicediscovery:${this.region}:${this.account}:namespace/${namespaceId}`
})
cloudMapOptions = {
name: props.serviceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class ReindexFromSnapshotStack extends MigrationServiceCore {

const osClusterEndpoint = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/osClusterEndpoint`)
const s3Uri = `s3://migration-artifacts-${this.account}-${props.stage}-${this.region}/rfs-snapshot-repo`
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --s3-local-dir /tmp/s3_files --s3-repo-uri ${s3Uri} --s3-region ${props.env?.region} --snapshot-name rfs-snapshot --min-replicas 1 --enable-persistent-run --lucene-dir '/lucene' --source-host ${props.sourceEndpoint} --target-host ${osClusterEndpoint} --source-version es_7_10 --target-version os_2_11`
let rfsCommand = `/rfs-app/runJavaWithClasspath.sh com.rfs.ReindexFromSnapshot --s3-local-dir /tmp/s3_files --s3-repo-uri ${s3Uri} --s3-region ${this.region} --snapshot-name rfs-snapshot --min-replicas 1 --enable-persistent-run --lucene-dir '/lucene' --source-host ${props.sourceEndpoint} --target-host ${osClusterEndpoint} --source-version es_7_10 --target-version os_2_11`
rfsCommand = props.extraArgs ? rfsCommand.concat(` ${props.extraArgs}`) : rfsCommand

this.createService({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class TrafficReplayerStack extends MigrationServiceCore {
readOnly: false,
sourceVolume: volumeName
}
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${props.env?.region}:${props.env?.account}:file-system/${volumeId}`
const replayerOutputEFSArn = `arn:aws:elasticfilesystem:${this.region}:${this.account}:file-system/${volumeId}`
const replayerOutputMountPolicy = new PolicyStatement( {
effect: Effect.ALLOW,
resources: [replayerOutputEFSArn],
Expand Down
Loading