Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(core) add firehose delivery ingestion stage #71

Merged
merged 25 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4b477e9
adding initial primitive version of FirehoseFactory
malachi-constant Mar 29, 2022
7801573
updating poetry
malachi-constant Mar 29, 2022
50b8dd0
Merge branch 'main' of github.com:awslabs/aws-ddk into 70-core-add-fi…
malachi-constant Mar 29, 2022
516e9ce
working through destination specific config
malachi-constant Mar 29, 2022
eaf4e38
adding more destination props
malachi-constant Mar 29, 2022
60edc36
refacotring class
malachi-constant Mar 30, 2022
f0f120f
Adding firehose to s3 stage
malachi-constant Mar 30, 2022
1ce61c4
adding escape hatches in stage and docstrings
malachi-constant Mar 31, 2022
d3e5ebc
Merge branch 'main' of github.com:awslabs/aws-ddk into 70-core-add-fi…
malachi-constant Mar 31, 2022
081ec9a
adding additional destination params
malachi-constant Mar 31, 2022
7681ca2
adding some additional params to configure destination
malachi-constant Mar 31, 2022
f52c2c0
Merge branch 'main' of github.com:awslabs/aws-ddk into 70-core-add-fi…
malachi-constant Apr 1, 2022
694c110
updating docstring and ordering for params
malachi-constant Apr 1, 2022
6bc7c90
updating naming
malachi-constant Apr 1, 2022
1186ff7
adding firehose destination resource factory
malachi-constant Apr 1, 2022
1cad434
adding kinesis resource factory
malachi-constant Apr 1, 2022
b6ddacc
fixing conflicts
malachi-constant Apr 1, 2022
5041d1e
updating poetry deps
malachi-constant Apr 1, 2022
4da72be
adding data stream support
malachi-constant Apr 1, 2022
d740052
adding alarms and updating docs
malachi-constant Apr 1, 2022
515f697
handling review comments
malachi-constant Apr 4, 2022
275fcd9
adding additional params for s3 destinations and default for compress…
malachi-constant Apr 4, 2022
2d73127
adding additional destination params
malachi-constant Apr 4, 2022
b510062
updating naming and docs
malachi-constant Apr 4, 2022
4f33a9d
Minor - Renaming and standardizing
jaidisido Apr 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/aws_ddk_core/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

from aws_ddk_core.resources._glue import GlueFactory
from aws_ddk_core.resources._kinesis_firehose import KinesisFirehoseFactory
from aws_ddk_core.resources._kinesis_streams import KinesisStreamsFactory
from aws_ddk_core.resources._kms import KMSFactory
from aws_ddk_core.resources._lambda import LambdaFactory
from aws_ddk_core.resources._s3 import S3Factory
Expand All @@ -21,6 +23,8 @@

__all__ = [
"GlueFactory",
"KinesisFirehoseFactory",
"KinesisStreamsFactory",
"KMSFactory",
"LambdaFactory",
"S3Factory",
Expand Down
225 changes: 225 additions & 0 deletions core/aws_ddk_core/resources/_kinesis_firehose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Any, Dict, Optional, Sequence

import aws_cdk.aws_kinesisfirehose_alpha as firehose
import aws_cdk.aws_kinesisfirehose_destinations_alpha as destinations
from aws_cdk.aws_iam import IRole
from aws_cdk.aws_kinesis import IStream
from aws_cdk.aws_kms import IKey
from aws_cdk.aws_logs import ILogGroup
from aws_cdk.aws_s3 import IBucket
from aws_ddk_core.config import Config
from aws_ddk_core.resources.commons import BaseSchema, Duration, Size
from constructs import Construct

_logger: logging.Logger = logging.getLogger(__name__)


class FirehoseDestinationSchema(BaseSchema):
"""DDK Firehose destination Marshmallow schema."""

# Firehose Destination CDK construct fields
buffering_interval = Duration()
buffering_size = Size()


class KinesisFirehoseFactory:
"""
Class factory create and configure Kinesis DDK resources, including Delivery Streams.
"""

@staticmethod
def delivery_stream(
scope: Construct,
id: str,
environment_id: str,
destinations: Sequence[firehose.IDestination],
delivery_stream_name: Optional[str] = None,
encryption: Optional[firehose.StreamEncryption] = None,
encryption_key: Optional[IKey] = None,
role: Optional[IRole] = None,
source_stream: Optional[IStream] = None,
**firehose_props: Any,
) -> firehose.IDeliveryStream:
"""
Create and configure Firehose delivery stream.

Parameters
----------
scope : Construct
Scope within which this construct is defined
id : str
Identifier of the delivery stream
environment_id : str
Identifier of the environment
destinations: Sequence[firehose.IDestination]
The destinations that this delivery stream will deliver data to
delivery_stream_name: Optional[str] = None
A name for the delivery stream
encryption: Optional[firehose.StreamEncryption] = None
Indicates the type of customer master key (CMK) to use for server-side encryption, if any.
Default: StreamEncryption.UNENCRYPTED
encryption_key: Optional[IKey] = None
Customer managed key to server-side encrypt data in the stream.
Default: - no KMS key will be used
role: Optional[IRole] = None
The IAM role associated with this delivery stream.
Assumed by Kinesis Data Firehose to read from sources and encrypt data server-side.
Default: - a role will be created with default permissions.
source_stream: Optional[IStream] = None
The Kinesis data stream to use as a source for this delivery stream
**firehose_props: Any
Additional properties. For complete list of properties refer to CDK Documentation -
Firehose Delivery Stream:
https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesisfirehose/DeliveryStream.html

Returns
-------
delivery_stream: firehose.IDeliveryStream
A Kinesis Firehose Delivery Stream
"""
# Collect args
firehose_props = {
"delivery_stream_name": delivery_stream_name,
"destinations": destinations,
"encryption": encryption,
"encryption_key": encryption_key,
"role": role,
"source_stream": source_stream,
**firehose_props,
}

# create delivery stream
_logger.debug(f"firehose_props: {firehose_props}")
firehose_stream: firehose.IDeliveryStream = firehose.DeliveryStream(scope, id, **firehose_props)

return firehose_stream

@staticmethod
def s3_destination(
id: str,
environment_id: str,
bucket: IBucket,
buffering_interval: Optional[Duration] = None,
buffering_size: Optional[Size] = None,
compression: Optional[destinations.Compression] = destinations.Compression.GZIP,
data_output_prefix: Optional[str] = None,
encryption_key: Optional[IKey] = None,
error_output_prefix: Optional[str] = None,
logging: Optional[bool] = True,
log_group: Optional[ILogGroup] = None,
processor: Optional[firehose.IDataProcessor] = None,
role: Optional[IRole] = None,
s3_backup: Optional[destinations.DestinationS3BackupProps] = None,
**destination_props: Any,
) -> destinations.S3Bucket:
"""
Create and configure Firehose delivery S3 destination.

This construct allows to configure parameters of the firehose destination using ddk.json
configuration file depending on the `environment_id` in which the function is used.
Supported parameters are: `buffering_interval` and `buffering_size`

Parameters
----------
id : str
Identifier of the destination
environment_id : str
Identifier of the environment
bucket: IBucket
S3 Bucket to use for the destination.
buffering_interval: Optional[Duration] = None
The length of time that Firehose buffers incoming data before delivering it to the S3 bucket.
Minimum: Duration.seconds(60)
Maximum: Duration.seconds(900)
Default: Duration.seconds(300)
buffering_size: Optional[Size] = None
The size of the buffer that Kinesis Data Firehose uses for incoming data
before delivering it to the S3 bucket.
Minimum: Size.mebibytes(1)
Maximum: Size.mebibytes(128)
Default: Size.mebibytes(5)
compression: Optional[Compression] = None
The type of compression that Kinesis Data Firehose uses to compress the data that it delivers
to the Amazon S3 bucket.
Default: Compression.GZIP
data_output_prefix: Optional[str] = None
A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3
encryption_key: Optional[IKey] = None
The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
error_output_prefix: Optional[str] = None
A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
This prefix appears immediately following the bucket name
logging: Optional[bool] = True
If true, log errors when data transformation or data delivery fails.
If logGroup is provided, this will be implicitly set to true.
Default: true - errors are logged.
log_group: Optional[ILogGroup] = None
The CloudWatch log group where log streams will be created to hold error logs.
Default: - if logging is set to true, a log group will be created for you.
processor: Optional[IDataProcessor] = None
The data transformation that should be performed on the data before writing to the destination.
role: Optional[IRole] = None
The IAM role associated with this destination.
Assumed by Kinesis Data Firehose to invoke processors and write to destinations
s3_backup: Optional[DestinationS3BackupProps] = None
The configuration for backing up source records to S3.
**destination_props: Any
Additional properties. For complete list of properties refer to CDK Documentation -
Firehose S3 Destinations:
https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesisfirehose_destinations/S3Bucket.html

Returns
-------
destination: destinations.S3Bucket
A Kinesis Firehose S3 Delivery Destination
"""
# Load and validate the config
destination_config_props: Dict[str, Any] = FirehoseDestinationSchema().load(
Config().get_resource_config(
environment_id=environment_id,
id=id,
),
partial=["removal_policy"],
)

# Collect args
destination_props = {
"buffering_interval": buffering_interval,
"buffering_size": buffering_size,
"compression": compression,
"data_output_prefix": data_output_prefix,
"encryption_key": encryption_key,
"error_output_prefix": error_output_prefix,
"logging": logging,
"log_group": log_group,
"processor": processor,
"role": role,
"s3_backup": s3_backup,
**destination_props,
}

# Explicit ("hardcoded") props should always take precedence over config
for key, value in destination_props.items():
if value is not None:
destination_config_props[key] = value

# create s3 destination
_logger.debug(f"firehose destination properties: {destination_props}")
destination: destinations.S3Bucket = destinations.S3Bucket(bucket, **destination_config_props)

return destination
129 changes: 129 additions & 0 deletions core/aws_ddk_core/resources/_kinesis_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Any, Dict, Optional

from aws_cdk.aws_kinesis import IStream, Stream, StreamEncryption, StreamMode
from aws_cdk.aws_kms import IKey
from aws_ddk_core.config import Config
from aws_ddk_core.resources.commons import BaseSchema, Duration
from constructs import Construct
from marshmallow import fields

_logger: logging.Logger = logging.getLogger(__name__)


class KinesisStreamsSchema(BaseSchema):
"""DDK Kinesis data stream Marshmallow schema."""

retention_period = Duration()
shard_count = fields.Int()


class KinesisStreamsFactory:
"""
Class factory create and configure Kinesis DDK resources, including Data Streams.
"""

@staticmethod
def data_stream(
scope: Construct,
id: str,
environment_id: str,
encryption: Optional[StreamEncryption] = None,
encryption_key: Optional[IKey] = None,
retention_period: Optional[Duration] = None,
shard_count: Optional[int] = None,
stream_mode: Optional[StreamMode] = None,
stream_name: Optional[str] = None,
**kinesis_props: Any,
) -> IStream:
"""
Create and configure Kinesis data stream.

This construct allows to configure parameters of the Kinesis data stream using ddk.json
configuration file depending on the `environment_id` in which the function is used.
Supported parameters are: `retention_period` and `shard_count`.

Parameters
----------
scope : Construct
Scope within which this construct is defined
id : str
Identifier of the data stream
environment_id : str
Identifier of the environment
encryption: Optional[StreamEncryption] = None
The kind of server-side encryption to apply to this stream.
If you choose KMS, you can specify a KMS key via encryptionKey.
If encryption key is not specified, a key will automatically be created.
Default: - StreamEncryption.KMS if encrypted Streams are supported
in the region or StreamEncryption.UNENCRYPTED otherwise.
StreamEncryption.KMS if an encryption key is supplied through the encryptionKey property
encryption_key: Optional[IKey] = None
External KMS key to use for stream encryption. The 'encryption' property must be set to “Kms”.
Default: - Kinesis Data Streams master key ('/alias/aws/kinesis')
retention_period: Optional[Duration] = None
The number of hours for the data records that are stored in shards to remain accessible.
Default: Duration.seconds(3600)
shard_count: Optional[int] = None
The number of shards for the stream. Can only be provided if streamMode is Provisioned.
Default: 1
stream_mode: Optional[StreamMode] = None
The capacity mode of this stream.
Default: StreamMode.PROVISIONED
stream_name: Optional[str] = None
Enforces a particular physical stream name.
Default: A CloudFormation generated name
**kinesis_props: Any
Additional properties. For complete list of properties refer to CDK Documentation -
Firehose Data Stream:
https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesis/Stream.html

Returns
-------
data_stream: Stream
A Kinesis Data Stream
"""
# Load and validate the config
kinesis_config_props: Dict[str, Any] = KinesisStreamsSchema().load(
Config().get_resource_config(
environment_id=environment_id,
id=id,
),
partial=["removal_policy"],
)

# Collect args
kinesis_props = {
"encryption": encryption,
"encryption_key": encryption_key,
"retention_period": retention_period,
"shard_count": shard_count,
"stream_mode": stream_mode,
"stream_name": stream_name,
**kinesis_props,
}

# Explicit ("hardcoded") props should always take precedence over config
for key, value in kinesis_props.items():
if value is not None:
kinesis_config_props[key] = value

# create delivery stream
_logger.debug(f"kinesis_props: {kinesis_props}")
data_stream: Stream = Stream(scope, id, **kinesis_props)

return data_stream
10 changes: 10 additions & 0 deletions core/aws_ddk_core/resources/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ def _deserialize(self, value: int, attr: Optional[str], data: Any, **kwargs: Any
raise ValidationError(f"`{attr}` must be an integer representing duration in seconds.") from error


class Size(fields.Field):
"""Field that deserializes a string to a CDK Size in mebibytes(MiB)."""

def _deserialize(self, value: int, attr: Optional[str], data: Any, **kwargs: Any) -> cdk.Size:
try:
return cdk.Size.mebibytes(value)
except TypeError as error:
raise ValidationError(f"`{attr}` must be an integer representing size in mebibytes.") from error


class SubnetType(fields.Field):
"""Field that deserializes a string to a CDK EC2 SubnetType."""

Expand Down
Loading