Skip to content

Commit

Permalink
Merge branch 'main' into EnableGradleScan
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait authored Feb 28, 2024
2 parents 2905609 + aff1b80 commit b70650a
Show file tree
Hide file tree
Showing 31 changed files with 262 additions and 402 deletions.
7 changes: 7 additions & 0 deletions TrafficCapture/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,14 @@ jsonMessageTransformers
nettyWireLogging
openSearch23PlusTargetTransformerProvider
replayerPlugins
testUtilities
trafficCaptureProxyServer
trafficCaptureProxyServerTest
trafficReplayer
```

To include a testFixture dependency, define the import like

```groovy
testImplementation testFixtures('org.opensearch.migrations.trafficcapture:trafficReplayer:0.1.0-SNAPSHOT')
```
6 changes: 1 addition & 5 deletions TrafficCapture/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@ subprojects {
def excludedProjects = [
'buildSrc',
'dockerSolution',
// TODO: Get testFixtures exported to Maven
'testUtilities',
]
if (!(project.name in excludedProjects)) {
publishing {
publications {
mavenJava(MavenPublication) {
artifactId = project.name

from components.java

groupId = 'org.opensearch.migrations.trafficcapture'
group = 'org.opensearch.migrations.trafficcapture'
version = '0.1.0-SNAPSHOT'

// Suppress POM metadata warnings for test fixtures
Expand Down
2 changes: 1 addition & 1 deletion TrafficCapture/dockerSolution/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ dockerCompose {
"${project.getProperty('otel-collector')}" :
"otel-prometheus-jaeger-opensearch.yml"),
"${extensionsDir}" + (project.hasProperty("multiProxy") ? "proxy-multi.yml" : "proxy-single.yml")
]
]
}

task buildDockerImages {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,94 +55,94 @@ public class CaptureProxy {
private static final String HTTPS_CONFIG_PREFIX = "plugins.security.ssl.http.";
public static final String DEFAULT_KAFKA_CLIENT_ID = "HttpCaptureProxyProducer";

static class Parameters {
public static class Parameters {
@Parameter(required = false,
names = {"--traceDirectory"},
arity = 1,
description = "Directory to store trace files in.")
String traceDirectory;
public String traceDirectory;
@Parameter(required = false,
names = {"--noCapture"},
arity = 0,
description = "If enabled, Does NOT capture traffic to ANY sink.")
boolean noCapture;
public boolean noCapture;
@Parameter(required = false,
names = {"--kafkaConfigFile"},
arity = 1,
description = "Kafka properties file for additional client customization.")
String kafkaPropertiesFile;
public String kafkaPropertiesFile;
@Parameter(required = false,
names = {"--kafkaClientId"},
arity = 1,
description = "clientId to use for interfacing with Kafka.")
String kafkaClientId = DEFAULT_KAFKA_CLIENT_ID;
public String kafkaClientId = DEFAULT_KAFKA_CLIENT_ID;
@Parameter(required = false,
names = {"--kafkaConnection"},
arity = 1,
description = "Sequence of <HOSTNAME:PORT> values delimited by ','.")
String kafkaConnection;
public String kafkaConnection;
@Parameter(required = false,
names = {"--enableMSKAuth"},
arity = 0,
description = "Enables SASL Kafka properties required for connecting to MSK with IAM auth.")
boolean mskAuthEnabled = false;
public boolean mskAuthEnabled = false;
@Parameter(required = false,
names = {"--sslConfigFile"},
arity = 1,
description = "YAML configuration of the HTTPS settings. When this is not set, the proxy will not use TLS.")
String sslConfigFilePath;
public String sslConfigFilePath;
@Parameter(required = false,
names = {"--maxTrafficBufferSize"},
arity = 1,
description = "The maximum number of bytes that will be written to a single TrafficStream object.")
int maximumTrafficStreamSize = 1024*1024;
public int maximumTrafficStreamSize = 1024*1024;
@Parameter(required = false,
names = {"--insecureDestination"},
arity = 0,
description = "Do not check the destination server's certificate")
boolean allowInsecureConnectionsToBackside;
public boolean allowInsecureConnectionsToBackside;
@Parameter(required = true,
names = {"--destinationUri"},
arity = 1,
description = "URI of the server that the proxy is capturing traffic for.")
String backsideUriString;
public String backsideUriString;
@Parameter(required = true,
names = {"--listenPort"},
arity = 1,
description = "Exposed port for clients to connect to this proxy.")
int frontsidePort = 0;
public int frontsidePort = 0;
@Parameter(required = false,
names = {"--numThreads"},
arity = 1,
description = "How many threads netty should create in its event loop group")
int numThreads = 1;
public int numThreads = 1;
@Parameter(required = false,
names = {"--destinationConnectionPoolSize"},
arity = 1,
description = "Number of socket connections that should be maintained to the destination server " +
"to reduce the perceived latency to clients. Each thread will have its own cache, so the " +
"total number of outstanding warm connections will be multiplied by numThreads.")
int destinationConnectionPoolSize = 0;
public int destinationConnectionPoolSize = 0;
@Parameter(required = false,
names = {"--destinationConnectionPoolTimeout"},
arity = 1,
description = "Of the socket connections maintained by the destination connection pool, " +
"how long after connection should the be recycled " +
"(closed with a new connection taking its place)")
String destinationConnectionPoolTimeout = "PT30S";
public String destinationConnectionPoolTimeout = "PT30S";
@Parameter(required = false,
names = {"--otelCollectorEndpoint"},
arity = 1,
description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be forwarded." +
"If this is not provided, metrics will not be sent to a collector.")
String otelCollectorEndpoint;
public String otelCollectorEndpoint;
@Parameter(required = false,
names = "--suppressCaptureForHeaderMatch",
arity = 2,
description = "The header name (which will be interpreted in a case-insensitive manner) and a regex " +
"pattern. When the incoming request has a header that matches the regex, it will be passed " +
"through to the service but will NOT be captured. E.g. user-agent 'healthcheck'.")
private List<String> suppressCaptureHeaderPairs = new ArrayList<>();
public List<String> suppressCaptureHeaderPairs = new ArrayList<>();
}

static Parameters parseArgs(String[] args) {
Expand All @@ -167,7 +167,7 @@ static Parameters parseArgs(String[] args) {
}

@SneakyThrows
private static Settings getSettings(@NonNull String configFile) {
protected static Settings getSettings(@NonNull String configFile) {
var builder = Settings.builder();
try (var lines = Files.lines(Paths.get(configFile))) {
lines
Expand All @@ -184,7 +184,7 @@ private static Settings getSettings(@NonNull String configFile) {
return builder.build();
}

private static IConnectionCaptureFactory<Object> getNullConnectionCaptureFactory() {
protected static IConnectionCaptureFactory<Object> getNullConnectionCaptureFactory() {
System.err.println("No trace log directory specified. Logging to /dev/null");
return ctx -> new StreamChannelConnectionCaptureSerializer<>(null, ctx.getConnectionId(),
new StreamLifecycleManager<>() {
Expand All @@ -201,7 +201,7 @@ public CompletableFuture<Object> closeStream(CodedOutputStreamHolder outputStrea
});
}

private static String getNodeId() {
protected static String getNodeId() {
return UUID.randomUUID().toString();
}

Expand Down Expand Up @@ -234,7 +234,7 @@ static Properties buildKafkaProperties(Parameters params) throws IOException {
return kafkaProps;
}

private static IConnectionCaptureFactory
protected static IConnectionCaptureFactory
getConnectionCaptureFactory(Parameters params, RootCaptureContext rootContext) throws IOException {
var nodeId = getNodeId();
// Resist the urge for now though until it comes in as a request/need.
Expand All @@ -252,7 +252,7 @@ static Properties buildKafkaProperties(Parameters params) throws IOException {

// Utility method for converting uri string to an actual URI object. Similar logic is placed in the trafficReplayer
// module: TrafficReplayer.java
private static URI convertStringToUri(String uriString) {
protected static URI convertStringToUri(String uriString) {
URI serverUri;
try {
serverUri = new URI(uriString);
Expand All @@ -274,7 +274,7 @@ private static URI convertStringToUri(String uriString) {
return serverUri;
}

private static SslContext loadBacksideSslContext(URI serverUri, boolean allowInsecureConnections) throws
protected static SslContext loadBacksideSslContext(URI serverUri, boolean allowInsecureConnections) throws
SSLException {
if (serverUri.getScheme().equalsIgnoreCase("https")) {
var sslContextBuilder = SslContextBuilder.forClient();
Expand All @@ -287,7 +287,7 @@ private static SslContext loadBacksideSslContext(URI serverUri, boolean allowIns
}
}

private static Map<String, String> convertPairListToMap(List<String> list) {
protected static Map<String, String> convertPairListToMap(List<String> list) {
var map = new TreeMap<String, String>();
for (int i=0; i<list.size(); i+=2) {
map.put(list.get(i), list.get(i+1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import java.util.function.Supplier;

public class NettyScanningHttpProxy {
private final int proxyPort;
private Channel mainChannel;
private EventLoopGroup workerGroup;
private EventLoopGroup bossGroup;
protected final int proxyPort;
protected Channel mainChannel;
protected EventLoopGroup workerGroup;
protected EventLoopGroup bossGroup;

public NettyScanningHttpProxy(int proxyPort) {
this.proxyPort = proxyPort;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import java.util.function.Supplier;

public class ProxyChannelInitializer<T> extends ChannelInitializer<SocketChannel> {
private final IConnectionCaptureFactory<T> connectionCaptureFactory;
private final Supplier<SSLEngine> sslEngineProvider;
private final IRootWireLoggingContext rootContext;
private final BacksideConnectionPool backsideConnectionPool;
private final RequestCapturePredicate requestCapturePredicate;
protected final IConnectionCaptureFactory<T> connectionCaptureFactory;
protected final Supplier<SSLEngine> sslEngineProvider;
protected final IRootWireLoggingContext rootContext;
protected final BacksideConnectionPool backsideConnectionPool;
protected final RequestCapturePredicate requestCapturePredicate;

public ProxyChannelInitializer(IRootWireLoggingContext rootContext,
BacksideConnectionPool backsideConnectionPool,
Expand Down
2 changes: 1 addition & 1 deletion deployment/cdk/opensearch-service-migration/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
node_modules
cdk.context.json
coverage

dist
# CDK asset staging directory
.cdk.staging
cdk.out
1 change: 1 addition & 0 deletions deployment/cdk/opensearch-service-migration/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './lib'
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Effect, PolicyStatement, Role, ServicePrincipal} from "aws-cdk-lib/aws-iam";
import {Construct} from "constructs";
import {StringParameter} from "aws-cdk-lib/aws-ssm";
import {CpuArchitecture} from "aws-cdk-lib/aws-ecs";

export function createOpenSearchIAMAccessPolicy(region: string, accountId: string): PolicyStatement {
return new PolicyStatement({
Expand Down Expand Up @@ -120,4 +121,22 @@ export function createDefaultECSTaskRole(scope: Construct, serviceName: string):
]
}))
return serviceTaskRole
}

export function validateFargateCpuArch(cpuArch?: string): CpuArchitecture {
const desiredArch = cpuArch ? cpuArch : process.arch
const desiredArchUpper = desiredArch.toUpperCase()

if (desiredArchUpper === "X86_64" || desiredArchUpper === "X64") {
return CpuArchitecture.X86_64
} else if (desiredArchUpper === "ARM64") {
return CpuArchitecture.ARM64
} else {
if (cpuArch) {
throw new Error(`Unknown Fargate cpu architecture provided: ${desiredArch}`)
}
else {
throw new Error(`Unsupported process cpu architecture detected: ${desiredArch}, CDK requires X64 or ARM64 for Docker image compatability`)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import {IVpc} from "aws-cdk-lib/aws-ec2";
import {Construct} from "constructs";
import {
Cluster,
ContainerImage,
ContainerImage, CpuArchitecture,
FargateTaskDefinition,
LogDrivers,
LogDrivers, OperatingSystemFamily,
Secret as ECSSecret
} from "aws-cdk-lib/aws-ecs";
import {Secret as SMSecret} from "aws-cdk-lib/aws-secretsmanager";
Expand All @@ -22,7 +22,8 @@ import {
export interface FetchMigrationProps extends StackPropsExt {
readonly vpc: IVpc,
readonly dpPipelineTemplatePath: string,
readonly sourceEndpoint: string
readonly sourceEndpoint: string,
readonly fargateCpuArch: CpuArchitecture
}

export class FetchMigrationStack extends Stack {
Expand All @@ -49,6 +50,10 @@ export class FetchMigrationStack extends Stack {
ecsTaskRole.addToPolicy(openSearchServerlessPolicy)
// ECS Task Definition
const fetchMigrationFargateTask = new FargateTaskDefinition(this, "fetchMigrationFargateTask", {
runtimePlatform: {
operatingSystemFamily: OperatingSystemFamily.LINUX,
cpuArchitecture: props.fargateCpuArch
},
family: `migration-${props.stage}-${serviceName}`,
memoryLimitMiB: 8192,
cpu: 2048,
Expand Down
10 changes: 10 additions & 0 deletions deployment/cdk/opensearch-service-migration/lib/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export * from './common-utilities';
export * from './fetch-migration-stack';
export * from './lambda'
export * from './migration-assistance-stack';
export * from './msk-utility-stack';
export * from './network-stack';
export * from './opensearch-domain-stack';
export * from './service-stacks';
export * from './stack-composer';
export * from './streaming-source-type';
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * as msk_ordered_endpoints_handler from './msk-ordered-endpoints-handler'
export * as msk_public_endpoint_handler from './msk-public-endpoint-handler'
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {StackPropsExt} from "../stack-composer";
import {IVpc, SecurityGroup} from "aws-cdk-lib/aws-ec2";
import {PortMapping, Protocol} from "aws-cdk-lib/aws-ecs";
import {CpuArchitecture, PortMapping, Protocol, ServiceConnectService} from "aws-cdk-lib/aws-ecs";
import {Construct} from "constructs";
import {join} from "path";
import {MigrationServiceCore} from "./migration-service-core";
import {ServiceConnectService} from "aws-cdk-lib/aws-ecs/lib/base/base-service";
import {StringParameter} from "aws-cdk-lib/aws-ssm";
import {StreamingSourceType} from "../streaming-source-type";
import {createMSKProducerIAMPolicies} from "../common-utilities";
Expand All @@ -14,6 +13,7 @@ export interface CaptureProxyESProps extends StackPropsExt {
readonly vpc: IVpc,
readonly streamingSourceType: StreamingSourceType,
readonly analyticsServiceEnabled: boolean,
readonly fargateCpuArch: CpuArchitecture,
readonly extraArgs?: string,
}

Expand Down Expand Up @@ -75,6 +75,7 @@ export class CaptureProxyESStack extends MigrationServiceCore {
serviceConnectServices: [serviceConnectService, esServiceConnectService],
serviceDiscoveryEnabled: true,
serviceDiscoveryPort: 19200,
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 1024,
taskMemoryLimitMiB: 4096,
...props
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {StackPropsExt} from "../stack-composer";
import {IVpc, SecurityGroup} from "aws-cdk-lib/aws-ec2";
import {PortMapping, Protocol} from "aws-cdk-lib/aws-ecs";
import {CpuArchitecture, PortMapping, Protocol, ServiceConnectService} from "aws-cdk-lib/aws-ecs";
import {Construct} from "constructs";
import {join} from "path";
import {MigrationServiceCore} from "./migration-service-core";
import {ServiceConnectService} from "aws-cdk-lib/aws-ecs/lib/base/base-service";
import {StringParameter} from "aws-cdk-lib/aws-ssm";
import {StreamingSourceType} from "../streaming-source-type";
import {createMSKProducerIAMPolicies} from "../common-utilities";
Expand All @@ -13,6 +12,7 @@ import {createMSKProducerIAMPolicies} from "../common-utilities";
export interface CaptureProxyProps extends StackPropsExt {
readonly vpc: IVpc,
readonly streamingSourceType: StreamingSourceType,
readonly fargateCpuArch: CpuArchitecture,
readonly customSourceClusterEndpoint?: string,
readonly analyticsServiceEnabled?: boolean,
readonly extraArgs?: string,
Expand Down Expand Up @@ -60,6 +60,7 @@ export class CaptureProxyStack extends MigrationServiceCore {
taskRolePolicies: servicePolicies,
portMappings: [servicePort],
serviceConnectServices: [serviceConnectService],
cpuArchitecture: props.fargateCpuArch,
taskCpuUnits: 512,
taskMemoryLimitMiB: 2048,
...props
Expand Down
Loading

0 comments on commit b70650a

Please sign in to comment.