Skip to content

Commit

Permalink
(core) add firehose delivery ingestion stage (#71)
Browse files Browse the repository at this point in the history
* adding initial primitive version of FirehoseFactory

* updating poetry

* working through destination specific config

* adding more destination props

* refacotring class

* Adding firehose to s3 stage

* adding escape hatches in stage and docstrings

* adding additional destination params

* adding some additional params to configure destination

* updating docstring and ordering for params

* updating naming

* adding firehose destination resource factory

* adding kinesis resource factory

* updating poetry deps

* adding data stream support

* adding alarms and updating docs

* handling review comments

* adding additional params for s3 destinations and default for compressions

* adding additional destination params

* updating naming and docs

* Minor - Renaming and standardizing

Co-authored-by: Abdel Jaidi <[email protected]>
  • Loading branch information
malachi-constant and jaidisido authored Apr 4, 2022
1 parent d51e232 commit c08d163
Show file tree
Hide file tree
Showing 14 changed files with 958 additions and 17 deletions.
4 changes: 4 additions & 0 deletions core/aws_ddk_core/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

from aws_ddk_core.resources._glue import GlueFactory
from aws_ddk_core.resources._kinesis_firehose import KinesisFirehoseFactory
from aws_ddk_core.resources._kinesis_streams import KinesisStreamsFactory
from aws_ddk_core.resources._kms import KMSFactory
from aws_ddk_core.resources._lambda import LambdaFactory
from aws_ddk_core.resources._s3 import S3Factory
Expand All @@ -21,6 +23,8 @@

__all__ = [
"GlueFactory",
"KinesisFirehoseFactory",
"KinesisStreamsFactory",
"KMSFactory",
"LambdaFactory",
"S3Factory",
Expand Down
225 changes: 225 additions & 0 deletions core/aws_ddk_core/resources/_kinesis_firehose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Any, Dict, Optional, Sequence

import aws_cdk.aws_kinesisfirehose_alpha as firehose
import aws_cdk.aws_kinesisfirehose_destinations_alpha as destinations
from aws_cdk.aws_iam import IRole
from aws_cdk.aws_kinesis import IStream
from aws_cdk.aws_kms import IKey
from aws_cdk.aws_logs import ILogGroup
from aws_cdk.aws_s3 import IBucket
from aws_ddk_core.config import Config
from aws_ddk_core.resources.commons import BaseSchema, Duration, Size
from constructs import Construct

_logger: logging.Logger = logging.getLogger(__name__)


class FirehoseDestinationSchema(BaseSchema):
"""DDK Firehose destination Marshmallow schema."""

# Firehose Destination CDK construct fields
buffering_interval = Duration()
buffering_size = Size()


class KinesisFirehoseFactory:
"""
Class factory create and configure Kinesis DDK resources, including Delivery Streams.
"""

@staticmethod
def delivery_stream(
scope: Construct,
id: str,
environment_id: str,
destinations: Sequence[firehose.IDestination],
delivery_stream_name: Optional[str] = None,
encryption: Optional[firehose.StreamEncryption] = None,
encryption_key: Optional[IKey] = None,
role: Optional[IRole] = None,
source_stream: Optional[IStream] = None,
**firehose_props: Any,
) -> firehose.IDeliveryStream:
"""
Create and configure Firehose delivery stream.
Parameters
----------
scope : Construct
Scope within which this construct is defined
id : str
Identifier of the delivery stream
environment_id : str
Identifier of the environment
destinations: Sequence[firehose.IDestination]
The destinations that this delivery stream will deliver data to
delivery_stream_name: Optional[str] = None
A name for the delivery stream
encryption: Optional[firehose.StreamEncryption] = None
Indicates the type of customer master key (CMK) to use for server-side encryption, if any.
Default: StreamEncryption.UNENCRYPTED
encryption_key: Optional[IKey] = None
Customer managed key to server-side encrypt data in the stream.
Default: - no KMS key will be used
role: Optional[IRole] = None
The IAM role associated with this delivery stream.
Assumed by Kinesis Data Firehose to read from sources and encrypt data server-side.
Default: - a role will be created with default permissions.
source_stream: Optional[IStream] = None
The Kinesis data stream to use as a source for this delivery stream
**firehose_props: Any
Additional properties. For complete list of properties refer to CDK Documentation -
Firehose Delivery Stream:
https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesisfirehose/DeliveryStream.html
Returns
-------
delivery_stream: firehose.IDeliveryStream
A Kinesis Firehose Delivery Stream
"""
# Collect args
firehose_props = {
"delivery_stream_name": delivery_stream_name,
"destinations": destinations,
"encryption": encryption,
"encryption_key": encryption_key,
"role": role,
"source_stream": source_stream,
**firehose_props,
}

# create delivery stream
_logger.debug(f"firehose_props: {firehose_props}")
firehose_stream: firehose.IDeliveryStream = firehose.DeliveryStream(scope, id, **firehose_props)

return firehose_stream

@staticmethod
def s3_destination(
id: str,
environment_id: str,
bucket: IBucket,
buffering_interval: Optional[Duration] = None,
buffering_size: Optional[Size] = None,
compression: Optional[destinations.Compression] = destinations.Compression.GZIP,
data_output_prefix: Optional[str] = None,
encryption_key: Optional[IKey] = None,
error_output_prefix: Optional[str] = None,
logging: Optional[bool] = True,
log_group: Optional[ILogGroup] = None,
processor: Optional[firehose.IDataProcessor] = None,
role: Optional[IRole] = None,
s3_backup: Optional[destinations.DestinationS3BackupProps] = None,
**destination_props: Any,
) -> destinations.S3Bucket:
"""
Create and configure Firehose delivery S3 destination.
This construct allows to configure parameters of the firehose destination using ddk.json
configuration file depending on the `environment_id` in which the function is used.
Supported parameters are: `buffering_interval` and `buffering_size`
Parameters
----------
id : str
Identifier of the destination
environment_id : str
Identifier of the environment
bucket: IBucket
S3 Bucket to use for the destination.
buffering_interval: Optional[Duration] = None
The length of time that Firehose buffers incoming data before delivering it to the S3 bucket.
Minimum: Duration.seconds(60)
Maximum: Duration.seconds(900)
Default: Duration.seconds(300)
buffering_size: Optional[Size] = None
The size of the buffer that Kinesis Data Firehose uses for incoming data
before delivering it to the S3 bucket.
Minimum: Size.mebibytes(1)
Maximum: Size.mebibytes(128)
Default: Size.mebibytes(5)
compression: Optional[Compression] = None
The type of compression that Kinesis Data Firehose uses to compress the data that it delivers
to the Amazon S3 bucket.
Default: Compression.GZIP
data_output_prefix: Optional[str] = None
A prefix that Kinesis Data Firehose evaluates and adds to records before writing them to S3
encryption_key: Optional[IKey] = None
The AWS KMS key used to encrypt the data that it delivers to your Amazon S3 bucket.
error_output_prefix: Optional[str] = None
A prefix that Kinesis Data Firehose evaluates and adds to failed records before writing them to S3.
This prefix appears immediately following the bucket name
logging: Optional[bool] = True
If true, log errors when data transformation or data delivery fails.
If logGroup is provided, this will be implicitly set to true.
Default: true - errors are logged.
log_group: Optional[ILogGroup] = None
The CloudWatch log group where log streams will be created to hold error logs.
Default: - if logging is set to true, a log group will be created for you.
processor: Optional[IDataProcessor] = None
The data transformation that should be performed on the data before writing to the destination.
role: Optional[IRole] = None
The IAM role associated with this destination.
Assumed by Kinesis Data Firehose to invoke processors and write to destinations
s3_backup: Optional[DestinationS3BackupProps] = None
The configuration for backing up source records to S3.
**destination_props: Any
Additional properties. For complete list of properties refer to CDK Documentation -
Firehose S3 Destinations:
https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesisfirehose_destinations/S3Bucket.html
Returns
-------
destination: destinations.S3Bucket
A Kinesis Firehose S3 Delivery Destination
"""
# Load and validate the config
destination_config_props: Dict[str, Any] = FirehoseDestinationSchema().load(
Config().get_resource_config(
environment_id=environment_id,
id=id,
),
partial=["removal_policy"],
)

# Collect args
destination_props = {
"buffering_interval": buffering_interval,
"buffering_size": buffering_size,
"compression": compression,
"data_output_prefix": data_output_prefix,
"encryption_key": encryption_key,
"error_output_prefix": error_output_prefix,
"logging": logging,
"log_group": log_group,
"processor": processor,
"role": role,
"s3_backup": s3_backup,
**destination_props,
}

# Explicit ("hardcoded") props should always take precedence over config
for key, value in destination_props.items():
if value is not None:
destination_config_props[key] = value

# create s3 destination
_logger.debug(f"firehose destination properties: {destination_props}")
destination: destinations.S3Bucket = destinations.S3Bucket(bucket, **destination_config_props)

return destination
129 changes: 129 additions & 0 deletions core/aws_ddk_core/resources/_kinesis_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Any, Dict, Optional

from aws_cdk.aws_kinesis import IStream, Stream, StreamEncryption, StreamMode
from aws_cdk.aws_kms import IKey
from aws_ddk_core.config import Config
from aws_ddk_core.resources.commons import BaseSchema, Duration
from constructs import Construct
from marshmallow import fields

_logger: logging.Logger = logging.getLogger(__name__)


class KinesisStreamsSchema(BaseSchema):
"""DDK Kinesis data stream Marshmallow schema."""

retention_period = Duration()
shard_count = fields.Int()


class KinesisStreamsFactory:
"""
Class factory create and configure Kinesis DDK resources, including Data Streams.
"""

@staticmethod
def data_stream(
scope: Construct,
id: str,
environment_id: str,
encryption: Optional[StreamEncryption] = None,
encryption_key: Optional[IKey] = None,
retention_period: Optional[Duration] = None,
shard_count: Optional[int] = None,
stream_mode: Optional[StreamMode] = None,
stream_name: Optional[str] = None,
**kinesis_props: Any,
) -> IStream:
"""
Create and configure Kinesis data stream.
This construct allows to configure parameters of the Kinesis data stream using ddk.json
configuration file depending on the `environment_id` in which the function is used.
Supported parameters are: `retention_period` and `shard_count`.
Parameters
----------
scope : Construct
Scope within which this construct is defined
id : str
Identifier of the data stream
environment_id : str
Identifier of the environment
encryption: Optional[StreamEncryption] = None
The kind of server-side encryption to apply to this stream.
If you choose KMS, you can specify a KMS key via encryptionKey.
If encryption key is not specified, a key will automatically be created.
Default: - StreamEncryption.KMS if encrypted Streams are supported
in the region or StreamEncryption.UNENCRYPTED otherwise.
StreamEncryption.KMS if an encryption key is supplied through the encryptionKey property
encryption_key: Optional[IKey] = None
External KMS key to use for stream encryption. The 'encryption' property must be set to “Kms”.
Default: - Kinesis Data Streams master key ('/alias/aws/kinesis')
retention_period: Optional[Duration] = None
The number of hours for the data records that are stored in shards to remain accessible.
Default: Duration.seconds(3600)
shard_count: Optional[int] = None
The number of shards for the stream. Can only be provided if streamMode is Provisioned.
Default: 1
stream_mode: Optional[StreamMode] = None
The capacity mode of this stream.
Default: StreamMode.PROVISIONED
stream_name: Optional[str] = None
Enforces a particular physical stream name.
Default: A CloudFormation generated name
**kinesis_props: Any
Additional properties. For complete list of properties refer to CDK Documentation -
Firehose Data Stream:
https://docs.aws.amazon.com/cdk/api/v1/python/aws_cdk.aws_kinesis/Stream.html
Returns
-------
data_stream: Stream
A Kinesis Data Stream
"""
# Load and validate the config
kinesis_config_props: Dict[str, Any] = KinesisStreamsSchema().load(
Config().get_resource_config(
environment_id=environment_id,
id=id,
),
partial=["removal_policy"],
)

# Collect args
kinesis_props = {
"encryption": encryption,
"encryption_key": encryption_key,
"retention_period": retention_period,
"shard_count": shard_count,
"stream_mode": stream_mode,
"stream_name": stream_name,
**kinesis_props,
}

# Explicit ("hardcoded") props should always take precedence over config
for key, value in kinesis_props.items():
if value is not None:
kinesis_config_props[key] = value

# create delivery stream
_logger.debug(f"kinesis_props: {kinesis_props}")
data_stream: Stream = Stream(scope, id, **kinesis_props)

return data_stream
10 changes: 10 additions & 0 deletions core/aws_ddk_core/resources/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ def _deserialize(self, value: int, attr: Optional[str], data: Any, **kwargs: Any
raise ValidationError(f"`{attr}` must be an integer representing duration in seconds.") from error


class Size(fields.Field):
"""Field that deserializes a string to a CDK Size in mebibytes(MiB)."""

def _deserialize(self, value: int, attr: Optional[str], data: Any, **kwargs: Any) -> cdk.Size:
try:
return cdk.Size.mebibytes(value)
except TypeError as error:
raise ValidationError(f"`{attr}` must be an integer representing size in mebibytes.") from error


class SubnetType(fields.Field):
"""Field that deserializes a string to a CDK EC2 SubnetType."""

Expand Down
Loading

0 comments on commit c08d163

Please sign in to comment.