Skip to content

Commit

Permalink
Add trafficReplayerMaxUptime parameter and functionality
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Apr 29, 2024
1 parent 60cd859 commit 73c1582
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import {
Ulimit,
OperatingSystemFamily,
Volume,
AwsLogDriverMode
AwsLogDriverMode,
ContainerDependencyCondition
} from "aws-cdk-lib/aws-ecs";
import {DockerImageAsset} from "aws-cdk-lib/aws-ecr-assets";
import {RemovalPolicy, Stack} from "aws-cdk-lib";
import {Duration, RemovalPolicy, Stack} from "aws-cdk-lib";
import {LogGroup, RetentionDays} from "aws-cdk-lib/aws-logs";
import {PolicyStatement} from "aws-cdk-lib/aws-iam";
import {CfnService as DiscoveryCfnService, PrivateDnsNamespace} from "aws-cdk-lib/aws-servicediscovery";
Expand Down Expand Up @@ -46,7 +47,8 @@ export interface MigrationServiceCoreProps extends StackPropsExt {
readonly taskCpuUnits?: number,
readonly taskMemoryLimitMiB?: number,
readonly taskInstanceCount?: number,
readonly ulimits?: Ulimit[]
readonly ulimits?: Ulimit[],
readonly maxUptime?: Duration
}

export class MigrationServiceCore extends Stack {
Expand Down Expand Up @@ -146,6 +148,38 @@ export class MigrationServiceCore extends Stack {
serviceContainer.addMountPoints(...props.mountPoints)
}

if (props.maxUptime) {
let maxUptimeSeconds = Math.max(props.maxUptime.toSeconds(), Duration.minutes(5).toSeconds());
let startupPeriodSeconds = 30;
// Add a separate container to monitor and fail healthcheck after a given maxUptime
const maxUptimeContainer = serviceTaskDef.addContainer("MaxUptimeContainer", {
image: ContainerImage.fromRegistry("public.ecr.aws/amazonlinux/amazonlinux:2023-minimal"),
memoryLimitMiB: 64,
entryPoint: [
"/bin/sh",
"-c",
"sleep infinity"
],
essential: true,
healthCheck: {
command: [
"CMD-SHELL",
"UPTIME=$(awk '{print int($1)}' /proc/uptime); " +
`test $UPTIME -gt ${startupPeriodSeconds} && ` +
`test $UPTIME -lt ${maxUptimeSeconds}`
],
timeout: Duration.seconds(2),
retries: 1,
startPeriod: Duration.seconds(startupPeriodSeconds * 2)
}
});
maxUptimeContainer.addContainerDependencies({
container: serviceContainer,
condition: ContainerDependencyCondition.START,
});
}


let cloudMapOptions: CloudMapOptions|undefined = undefined
if (props.serviceDiscoveryEnabled) {
const namespaceId = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/${props.defaultDeployId}/cloudMapNamespaceId`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
createOpenSearchServerlessIAMAccessPolicy
} from "../common-utilities";
import {StreamingSourceType} from "../streaming-source-type";
import { Duration } from "aws-cdk-lib";


export interface TrafficReplayerProps extends StackPropsExt {
Expand All @@ -23,7 +24,8 @@ export interface TrafficReplayerProps extends StackPropsExt {
readonly customKafkaGroupId?: string,
readonly userAgentSuffix?: string,
readonly extraArgs?: string,
readonly otelCollectorEnabled?: boolean
readonly otelCollectorEnabled?: boolean,
readonly maxUptime?: Duration
}

export class TrafficReplayerStack extends MigrationServiceCore {
Expand Down Expand Up @@ -109,5 +111,4 @@ export class TrafficReplayerStack extends MigrationServiceCore {
...props
});
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {Construct} from "constructs";
import {Stack, StackProps} from "aws-cdk-lib";
import {Duration, Stack, StackProps} from "aws-cdk-lib";
import {OpenSearchDomainStack} from "./opensearch-domain-stack";
import {EngineVersion, TLSSecurityPolicy} from "aws-cdk-lib/aws-opensearchservice";
import * as defaultValuesJson from "../default-values.json"
Expand Down Expand Up @@ -170,6 +170,7 @@ export class StackComposer {
const migrationConsoleServiceEnabled = this.getContextForType('migrationConsoleServiceEnabled', 'boolean', defaultValues, contextJSON)
const trafficReplayerServiceEnabled = this.getContextForType('trafficReplayerServiceEnabled', 'boolean', defaultValues, contextJSON)
const trafficReplayerEnableClusterFGACAuth = this.getContextForType('trafficReplayerEnableClusterFGACAuth', 'boolean', defaultValues, contextJSON)
const trafficReplayerMaxUptime = this.getContextForType('trafficReplayerMaxUptime', 'string', defaultValues, contextJSON);
const trafficReplayerGroupId = this.getContextForType('trafficReplayerGroupId', 'string', defaultValues, contextJSON)
const trafficReplayerUserAgentSuffix = this.getContextForType('trafficReplayerUserAgentSuffix', 'string', defaultValues, contextJSON)
const trafficReplayerExtraArgs = this.getContextForType('trafficReplayerExtraArgs', 'string', defaultValues, contextJSON)
Expand Down Expand Up @@ -448,6 +449,7 @@ export class StackComposer {
stage: stage,
defaultDeployId: defaultDeployId,
fargateCpuArch: fargateCpuArch,
maxUptime: trafficReplayerMaxUptime ? Duration.parse(trafficReplayerMaxUptime) : undefined,
...props,
})
this.addDependentStacks(trafficReplayerStack, [networkStack, migrationStack, mskUtilityStack,
Expand Down
1 change: 1 addition & 0 deletions deployment/cdk/opensearch-service-migration/options.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ These tables list all CDK context configuration values a user can specify for th
| trafficReplayerGroupId | string | "logging-group-default" | The Kafka consumer group ID the Replayer will use, if not specified a default ID will be used |
| trafficReplayerUserAgentSuffix | string | "AWS/test/v1.0.0" | A custom user agent that will be provided to the Replayer using the `--user-agent` flag. This will append the provided user agent to any existing user agents when requests are made to the target cluster. This setting could also be specified with the `trafficReplayerExtraArgs` option |
| trafficReplayerExtraArgs | string | "--sigv4-auth-header-service-region es,us-east-1 --speedup-factor 5" | Extra arguments to provide to the Replayer command. This includes auth header options and other parameters supported by the [Traffic Replayer](../../../TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java). |
| trafficReplayerMaxUptime | string | "PT2H" | The maximum uptime for the Traffic Replayer service, specified in ISO 8601 duration format. This controls how long the Traffic Replayer will run before automatically shutting down. Example values: "PT30M" (30 minutes), "PT2H" (2 hours). When this duration is reached, ECS will initiate the startup of a new Traffic Replayer task to ensure continuous operation. This mechanism ensures that the Traffic Replayer service can manage its resources effectively and prevent issues associated with long running processes. Set to the greater of the given value 5 minutes. |
| captureProxySourceEndpoint | string | `"https://my-source-cluster.com:443"` | The URI of the source cluster from which requests will be captured for. **Note**: This is only applicable to the standalone `capture-proxy` service |

### Fetch Migration Service Options
Expand Down

0 comments on commit 73c1582

Please sign in to comment.