Skip to content

Commit

Permalink
Capture the created role arn for snapshotting into the AWS yaml param…
Browse files Browse the repository at this point in the history
…eter and pass it through the command line (opensearch-project#1133)

* Capture the created role arn for snapshotting into the AWS yaml parameter and pass it through the command line

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn authored Nov 13, 2024
1 parent b62778e commit 5514bc7
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ Exactly one of the following blocks must be present:
- `s3`:
- `repo_uri`: required, `s3://` path to where the snapshot repo exists or should be created (the bucket must already exist, and the repo needs to be configured on the source cluster)
- `aws_region`: required, region for the s3 bucket
- `role`: optional, required for clusters managed by Amazon OpenSearch Service. The IAM Role that is passed to the source cluster for the service to assume in order to work with the snapshot bucket.

- `fs`:
- `repo_path`: required, path to where the repo exists or should be created on the filesystem (the repo needs to be configured on the source cluster).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
'schema': {
'repo_uri': {'type': 'string', 'required': True},
'aws_region': {'type': 'string', 'required': True},
'role': {'type': 'string', 'required': False}
}
},
'fs': {
Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(self, config: Dict, source_cluster: Cluster) -> None:
self.snapshot_name = config['snapshot_name']
self.otel_endpoint = config.get("otel_endpoint", None)
self.s3_repo_uri = config['s3']['repo_uri']
self.s3_role_arn = config['s3'].get('role')
self.s3_region = config['s3']['aws_region']

def create(self, *args, **kwargs) -> CommandResult:
Expand All @@ -131,6 +133,8 @@ def create(self, *args, **kwargs) -> CommandResult:
command_args["--no-wait"] = FlagOnlyArgument
if max_snapshot_rate_mb_per_node is not None:
command_args["--max-snapshot-rate-mb-per-node"] = max_snapshot_rate_mb_per_node
if self.s3_role_arn:
command_args["--s3-role-arn"] = self.s3_role_arn
if extra_args:
for arg in extra_args:
command_args[arg] = FlagOnlyArgument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,36 @@ def test_s3_snapshot_create_calls_subprocess_run_with_correct_args(mocker):
"--max-snapshot-rate-mb-per-node", str(max_snapshot_rate),
], stdout=None, stderr=None, text=True, check=True)

def test_s3_snapshot_create_calls_subprocess_run_with_correct_s3_role(mocker):
s3_role = "arn:aws:iam::123456789012:role/OSMigrations-dev-us-west-1-default-SnapshotRole"
config = {
"snapshot": {
"snapshot_name": "reindex_from_snapshot",
"s3": {
"repo_uri": "s3://my-snapshot-bucket",
"aws_region": "us-east-2",
"role": s3_role
}
}
}
max_snapshot_rate = 100
source = create_valid_cluster(auth_type=AuthMethod.NO_AUTH)
snapshot = S3Snapshot(config["snapshot"], source)

mock = mocker.patch("subprocess.run")
snapshot.create(max_snapshot_rate_mb_per_node=max_snapshot_rate)

mock.assert_called_once_with(["/root/createSnapshot/bin/CreateSnapshot",
"--snapshot-name", config["snapshot"]["snapshot_name"],
"--source-host", source.endpoint,
"--source-insecure",
"--s3-repo-uri", config["snapshot"]["s3"]["repo_uri"],
"--s3-region", config["snapshot"]["s3"]["aws_region"],
"--no-wait",
"--max-snapshot-rate-mb-per-node", str(max_snapshot_rate),
"--s3-role-arn", s3_role,
], stdout=None, stderr=None, text=True, check=True)


def test_s3_snapshot_create_fails_for_clusters_with_auth(mocker):
config = {
Expand Down Expand Up @@ -310,7 +340,6 @@ def test_fs_snapshot_create_works_for_clusters_with_basic_auth(mocker):
"--max-snapshot-rate-mb-per-node", str(max_snapshot_rate),
], stdout=None, stderr=None, text=True, check=True)


def test_fs_snapshot_create_works_for_clusters_with_sigv4(mocker):
config = {
"snapshot": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class FileSystemSnapshotYaml {
export class S3SnapshotYaml {
repo_uri = '';
aws_region = '';
role? = '';
}

export class SnapshotYaml {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,19 +253,10 @@ export class MigrationConsoleStack extends MigrationServiceCore {
}
}

const parameter = createMigrationStringParameter(this, servicesYaml.stringify(), {
...props,
parameter: MigrationSSMParameter.SERVICES_YAML_FILE,
const serviceTaskRole = new Role(this, 'MigrationServiceTaskRole', {
assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'),
description: 'Role for Migration Console ECS Tasks',
});
const environment: Record<string, string> = {
"MIGRATION_DOMAIN_ENDPOINT": osClusterEndpoint,
"MIGRATION_KAFKA_BROKER_ENDPOINTS": brokerEndpoints,
"MIGRATION_STAGE": props.stage,
"MIGRATION_SOLUTION_VERSION": props.migrationsSolutionVersion,
"MIGRATION_SERVICES_YAML_PARAMETER": parameter.parameterName,
"MIGRATION_SERVICES_YAML_HASH": hashStringSHA256(servicesYaml.stringify()),
"SHARED_LOGS_DIR_PATH": `${sharedLogFileSystem.mountPointPath}/migration-console-${props.defaultDeployId}`,
}

const openSearchPolicy = createOpenSearchIAMAccessPolicy(this.partition, this.region, this.account)
const openSearchServerlessPolicy = createOpenSearchServerlessIAMAccessPolicy(this.partition, this.region, this.account)
Expand All @@ -275,11 +266,36 @@ export class MigrationConsoleStack extends MigrationServiceCore {
...(getTargetSecretsPolicy ? [getTargetSecretsPolicy] : []),
...(getSourceSecretsPolicy ? [getSourceSecretsPolicy] : [])
]

if (props.streamingSourceType === StreamingSourceType.AWS_MSK) {
const mskAdminPolicies = this.createMSKAdminIAMPolicies(props.stage, props.defaultDeployId)
servicePolicies = servicePolicies.concat(mskAdminPolicies)
}

if (props.managedServiceSourceSnapshotEnabled &&
servicesYaml.snapshot &&
servicesYaml.snapshot.s3) {
servicesYaml.snapshot.s3.role =
createSnapshotOnAOSRole(this, artifactS3Arn, serviceTaskRole.roleArn,
this.region, props.stage, props.defaultDeployId)
.roleArn;
}

const parameter = createMigrationStringParameter(this, servicesYaml.stringify(), {
...props,
parameter: MigrationSSMParameter.SERVICES_YAML_FILE,
});
const environment: Record<string, string> = {
"MIGRATION_DOMAIN_ENDPOINT": osClusterEndpoint,
"MIGRATION_KAFKA_BROKER_ENDPOINTS": brokerEndpoints,
"MIGRATION_STAGE": props.stage,
"MIGRATION_SOLUTION_VERSION": props.migrationsSolutionVersion,
"MIGRATION_SERVICES_YAML_PARAMETER": parameter.parameterName,
"MIGRATION_SERVICES_YAML_HASH": hashStringSHA256(servicesYaml.stringify()),
"SHARED_LOGS_DIR_PATH": `${sharedLogFileSystem.mountPointPath}/migration-console-${props.defaultDeployId}`,
}


if (props.migrationAPIEnabled) {
servicePortMappings = [{
name: "migration-console-connect",
Expand Down Expand Up @@ -319,16 +335,13 @@ export class MigrationConsoleStack extends MigrationServiceCore {
volumes: [sharedLogFileSystem.asVolume()],
mountPoints: [sharedLogFileSystem.asMountPoint()],
environment: environment,
taskRole: serviceTaskRole,
taskRolePolicies: servicePolicies,
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 1024,
taskMemoryLimitMiB: 2048,
...props
});

if (props.managedServiceSourceSnapshotEnabled) {
createSnapshotOnAOSRole(this, artifactS3Arn, this.serviceTaskRole.roleArn, this.region, props.stage, props.defaultDeployId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export interface MigrationServiceCoreProps extends StackPropsExt {
readonly cpuArchitecture: CpuArchitecture,
readonly dockerImageName: string,
readonly dockerImageCommand?: string[],
readonly taskRole?: Role,
readonly taskRolePolicies?: PolicyStatement[],
readonly mountPoints?: MountPoint[],
readonly volumes?: Volume[],
Expand All @@ -56,7 +57,7 @@ export class MigrationServiceCore extends Stack {
vpc: props.vpc
})

this.serviceTaskRole = createDefaultECSTaskRole(this, props.serviceName)
this.serviceTaskRole = props.taskRole ? props.taskRole : createDefaultECSTaskRole(this, props.serviceName)
props.taskRolePolicies?.forEach(policy => this.serviceTaskRole.addToPolicy(policy))

const serviceTaskDef = new FargateTaskDefinition(this, "ServiceTaskDef", {
Expand Down

0 comments on commit 5514bc7

Please sign in to comment.