Skip to content

Commit

Permalink
load_tests: add tcp load tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Fala <[email protected]>
  • Loading branch information
matthewfala committed Sep 20, 2022
1 parent a60cc9b commit 4a14dc8
Show file tree
Hide file tree
Showing 25 changed files with 618 additions and 116 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
bin
integ/out
.venv
3 changes: 3 additions & 0 deletions load_tests/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
create_testing_resources/cdk.out
create_testing_resources/kinesis_s3_firehose/cdk.out
task_definitions/*_*m.json
89 changes: 47 additions & 42 deletions load_tests/create_testing_resources/kinesis_s3_firehose/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from logging import captureWarnings
import os
import json
from aws_cdk import (
Expand All @@ -8,6 +9,7 @@
core,
)

DESTINATION_LIST = ["", "std-"] # "" is the destination tag for logs coming from non-stdstream input
THROUGHPUT_LIST = json.loads(os.environ['THROUGHPUT_LIST'])
PLATFORM = os.environ['PLATFORM'].lower()
PREFIX= os.environ['PREFIX']
Expand All @@ -30,49 +32,52 @@ def __init__(self, scope: core.Construct, construct_id: str, **kwargs) -> None:
)

names = locals()
for throughput in THROUGHPUT_LIST:
# Data streams and related delivery streams for kinesis test
names[PLATFORM+'_kinesis_stream_'+throughput] = kinesis.Stream(self, PLATFORM+'KinesisStream'+throughput,
stream_name=PREFIX+PLATFORM+'-kinesisStream-'+throughput,
shard_count=100)
kinesis_policy = iam.Policy(self, 'kinesisPolicyfor'+throughput,
statements=[iam.PolicyStatement(actions=['kinesis:*'], resources=[names.get(PLATFORM+'_kinesis_stream_'+throughput).stream_arn])],
roles=[firehose_role],
)
names[PLATFORM+'_kinesis_test_delivery_stream_'+throughput] = firehose.CfnDeliveryStream(
self, PLATFORM+'KinesisTestDeliveryStream'+throughput,
delivery_stream_name=PREFIX+PLATFORM+'-kinesisTest-deliveryStream-'+throughput,
delivery_stream_type='KinesisStreamAsSource',
kinesis_stream_source_configuration=firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=names.get(PLATFORM+'_kinesis_stream_'+throughput).stream_arn,
role_arn=firehose_role.role_arn
),
s3_destination_configuration=firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
bucket_arn=bucket.bucket_arn,
buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=60,
size_in_m_bs=50
),
compression_format='UNCOMPRESSED',
role_arn=firehose_role.role_arn,
prefix=f'kinesis-test/{PLATFORM}/{throughput}/'
))
names.get(PLATFORM+'_kinesis_test_delivery_stream_'+throughput).add_depends_on(kinesis_policy.node.default_child)
# Delivery streams for firehose test
names[PLATFORM+'_firehose_test_delivery_stream_'+throughput] = firehose.CfnDeliveryStream(
self, PLATFORM+'FirehoseTestDeliveryStream'+throughput,
delivery_stream_name=PREFIX+PLATFORM+'-firehoseTest-deliveryStream-'+throughput,
delivery_stream_type='DirectPut',
s3_destination_configuration=firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
bucket_arn=bucket.bucket_arn,
buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=60,
size_in_m_bs=50
for destination in DESTINATION_LIST:
for throughput in THROUGHPUT_LIST:
caps_identifier = destination.capitalize().replace("-", "") + throughput.capitalize()
identifier = destination + throughput
# Data streams and related delivery streams for kinesis test
names[PLATFORM+'_kinesis_stream_'+identifier] = kinesis.Stream(self, PLATFORM+'KinesisStream'+caps_identifier,
stream_name=PREFIX+PLATFORM+'-kinesisStream-'+identifier,
shard_count=100)
kinesis_policy = iam.Policy(self, 'kinesisPolicyfor'+identifier,
statements=[iam.PolicyStatement(actions=['kinesis:*'], resources=[names.get(PLATFORM+'_kinesis_stream_'+identifier).stream_arn])],
roles=[firehose_role],
)
names[PLATFORM+'_kinesis_test_delivery_stream_'+identifier] = firehose.CfnDeliveryStream(
self, PLATFORM+'KinesisTestDeliveryStream'+caps_identifier,
delivery_stream_name=PREFIX+PLATFORM+'-kinesisTest-deliveryStream-'+identifier,
delivery_stream_type='KinesisStreamAsSource',
kinesis_stream_source_configuration=firehose.CfnDeliveryStream.KinesisStreamSourceConfigurationProperty(
kinesis_stream_arn=names.get(PLATFORM+'_kinesis_stream_'+identifier).stream_arn,
role_arn=firehose_role.role_arn
),
compression_format='UNCOMPRESSED',
role_arn=firehose_role.role_arn,
prefix=f'firehose-test/{PLATFORM}/{throughput}/'
))
s3_destination_configuration=firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
bucket_arn=bucket.bucket_arn,
buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=60,
size_in_m_bs=50
),
compression_format='UNCOMPRESSED',
role_arn=firehose_role.role_arn,
prefix=f'kinesis-test/{PLATFORM}/{identifier}/'
))
names.get(PLATFORM+'_kinesis_test_delivery_stream_'+identifier).add_depends_on(kinesis_policy.node.default_child)
# Delivery streams for firehose test
names[PLATFORM+'_firehose_test_delivery_stream_'+identifier] = firehose.CfnDeliveryStream(
self, PLATFORM+'FirehoseTestDeliveryStream'+caps_identifier,
delivery_stream_name=PREFIX+PLATFORM+'-firehoseTest-deliveryStream-'+identifier,
delivery_stream_type='DirectPut',
s3_destination_configuration=firehose.CfnDeliveryStream.S3DestinationConfigurationProperty(
bucket_arn=bucket.bucket_arn,
buffering_hints=firehose.CfnDeliveryStream.BufferingHintsProperty(
interval_in_seconds=60,
size_in_m_bs=50
),
compression_format='UNCOMPRESSED',
role_arn=firehose_role.role_arn,
prefix=f'firehose-test/{PLATFORM}/{identifier}/'
))

# Add stack outputs
core.CfnOutput(self, 'S3BucketName',
Expand Down
Loading

0 comments on commit 4a14dc8

Please sign in to comment.