Skip to content

makkan/aws-kinesis-spring-boot-starter

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

aws-kinesis-spring-boot-starter

Build Status Coverage Status Release License

Dependencies

  • Spring Boot 2.1.0 or higher
  • Jackson

Notes

This library is written in Kotlin. However, it's perfectly compatible with Java. You will find examples for both languages below.

Installation

Add the following dependency to your project:

repositories {
    ...
    maven { url 'https://jitpack.io' }
}
compile "com.github.bringmeister:aws-kinesis-spring-boot-starter:+"

Note: See above for the latest version available!

Configuration

In order to use this library you need to configure some properties in your application.yml. The following shows the minimal required configuration. This configuration will allow you to send and receive messages.

aws:
  kinesis:
    region: eu-central-1
    consumer-group: example-service
    aws-account-id: "000000000000"
    iam-role-to-assume: ExampleKinesisRole

Configuration Guide

Local development

Before your application goes live you typically want to develop and test your code locally. To do so, we have used Docker. You find a docker-compose.yml file in the root of this project. Run docker-compose up in order to start Kinesis (and DynamoDB).

The configuration for local development looks like this:

aws:
  kinesis:
    region: local
    kinesis-url: http://localhost:14567
    consumer-group: example-service
    aws-account-id: "222222222222"
    iam-role-to-assume: ExampleKinesisRole
    create-streams: true
    dynamo-db-settings:
      url: http://localhost:14568

Any stream used in your application will be created (as soon as it is used first) if it does not exist.

Also, you must enable a Spring profile (kinesis-local):

spring:
  profiles:
    include: kinesis-local

You can also see JavaListenerTest and KotlinListenerTest.kt for running examples. Both tests will use the same Docker images in order to send and receive messages.

Creating streams automatically

You can create streams automatically by turning the create-streams flag on:

aws:
  kinesis:
    ...
    create-streams: true

By default, create-streams is turned off and streams must be created externally.

Validate data send and received

By default, this starter validates all data send and received automatically if a bean of type javax.validation.Validator is found. This feature can be disable by setting validate flag to false:

aws:
  kinesis:
    ...
    validate: false

By default, validate is turned on.

Starter Metrics

This starter reports metrics about records send and received when io.micrometer:micrometer-core is found on classpath. This feature can be disable by setting metrics flag to false:

aws:
  kinesis:
    ...
    metrics: false

The following metrics are recorded and tagged with stream and exception (default None):

  • aws.kinesis.starter.inbound: Duration of calls to KinesisInboundHandler.handleRecord (+ tag retry)
  • aws.kinesis.starter.outbound: Duration of calls to KinesisOutboundStream.send

By default, metrics is turned on.

KCL Metrics

KCL's own metrics can be reported to one of several systems by setting aws.kinesis.streams[*].metrics-driver to one of the following values:

  • DEFAULT: Export metrics with KCL.
  • LOGGING: Log metrics.
  • MICROMETER: Report metrics using Spring's build-in Micrometer.
  • NONE: Disables KCL metrics.

Additionally, the metrics level can be adjusted by setting aws.kinesis.streams[*].metrics-level to one of the following values

  • NONE
  • SUMMARY
  • DETAILED
aws:
  kinesis:
    ...
    streams:
      - stream-name: some-stream-name
        metrics-level: DETAILED
        metrics-driver: MICROMETER

If nothing is explicitly specified, DEFAULT and NONE is set.

Disable CBOR

Occassionally, KCL will try to treat JSON responses from AWS as CBOR, causing exceptions during initialization: Illegal length for VALUE_STRING: 2473435388096836386.

Additionally, during integration tests CBOR might have to be disabled due to insufficient support by test containers.

To disable CBOR globally for the AWS SDK set aws.kinesis.disable-cbor: true. Default: false

Note: This setting applies globally to the whole AWS SDK.

MDC

By default, this starter adds metadata when processing records to MDC. This feature can be disable by setting mdc.enabled flag to false:

aws:
  kinesis:
    ...
    mdc:
      enabled: false

Use the following properties to adjust the MDC property or disable it completely:

  • aws.kinesis.mdc.stream-name-property
  • aws.kinesis.mdc.shard-id-property
  • aws.kinesis.mdc.sequence-number-property
  • aws.kinesis.mdc.partition-key-property (null by default)

Setting the property to null omits it from MDC.

Configuring initial position in stream

You can use one of following values:

  • LATEST: Start after the most recent data record (fetch new data).
  • TRIM_HORIZON: Start from the oldest available data record.
aws:
  kinesis:
    ...
    streams:
      - stream-name: some-stream-name
        initial-position-in-stream: TRIM_HORIZON

If you don't specify anything, by default, LATEST value will be used.

Configuring stream accounts and roles

You can configure listeners to use a dedicated role and account for a stream.

aws:
  kinesis:
    ...
    streams:
      - stream-name: my-special-stream
        aws-account-id: "111111111111"
        iam-role-to-assume: SpecialKinesisRole

Configure handler retries

You can configure the retry mechanism for your event handlers like this:

aws:
  kinesis:
    ...
    handler:
      retry:
        maxRetries: 5
        backoff: 100ms
    ...

Max Retries:

  • 0: No retries
  • >=1: Finite amount of retry attempts
  • -1: Infinite retries

By default, each record after deserialization will be passed for processing to your handler once (maxRetries=0). When an exception occurs, the record will be skipped and the next one will be processed. If you want to retry the processing of failed records automatically, you can do so either infinitely (maxRetries=-1) or upto the specified count of attempts (maxRetries=n). Errors during deserialization on the other hand won't be retried at all, as it make no sense to do so. This behaviour is chosen to prevent killing the consumer with a "poison pill". Such malformed messages will be logged and skipped.

Configure checkpointing

Checkpointing is the process of storing the sequence number of the last processed kinesis record in a dynamodb table. It can be configured like this:

aws:
  kinesis:
    ...
    checkpointing:
        strategy: RECORD
        retry:
          maxRetries: 23
          backoff: 1s
    ...

Checkpointing strategy:

  • RECORD: Checkpoint after each record of a batch
  • BATCH: Checkpoint only once after the whole batch of records was processed

By default the BATCH strategy is used and will checkpoint only after a whole batch of records is processed.

Configure producers

You can configure producers in order to use a dedicated role and account for a stream.

aws:
  kinesis:
    ...
    producer:
      - stream-name: my-special-stream
        aws-account-id: "111111111111"
        iam-role-to-assume: SpecialKinesisConsumer

Specify credentials per role

AWS credentials are resolved using AWS' DefaultAWSCredentialsProviderChain. It is possible to override credentials on a per-role basis as follows:

aws:
  kinesis:
    role-credentials:
      - iam-role-to-asssume: <IAM_ROLE>
        aws-account-id: <ACCOUNT_ID>
        access-key: "xxx"
        secret-key: "yyy"
      - ...

Disable automatic registration of @KinesisListener

Automatic registration of @KinesisListener-annotated methods can be disabled.

aws:
  kinesis:
    listener:
      disabled: true

Usage

Publishing messages

Inject the AwsKinesisOutboundGateway wherever you like and pass stream name, data (the actual payload) and metadata to the send()-method.

Java example:

@Service
public class MyService {

    private final AwsKinesisOutboundGateway gateway;

    public MyService(AwsKinesisOutboundGateway gateway) {
        this.gateway = gateway;
    }

    public void sendMyMessage() {
        Record record = new Record(new MyMessage("my content"), new MyMetadata("my metadata"));
        gateway.send("my-stream", record); 
    }
}

See JavaListenerTest.java for an example.

Kotlin example:

@Service
class MyService(private val gateway: AwsKinesisOutboundGateway) {
    fun sendMyMessage() {        
        val record = Record(MyMessage("my content"), MyMetadata("my metadata"))
        gateway.send("my-stream", record)
    }
}

See KotlinListenerTest.kt for an example.

The event will be marshalled as JSON using Jackson and send to the Kinesis stream using the credentials defined in the application.yml.

{
    "data":"my content",
    "metadata":"my metadata"
}

Consuming messages

In order to consume messages, you need to annotate your listener method with the KinesisListener annotation. Your class must be a Spring Bean annotated with @Service or @Component. It will be picked-up automatically and registered as a listener. The listener method accepts data as first and, optionally, a second meta argument. The arguments types are user-defined. By default, the application-context's configured ObjectMapper is used to deserialize stream events of the following format {"data": <any>, "meta": <any>} into the types defined for each argument.

Java example:

@Service
public class MyKinesisListener {

    @KinesisListener(stream = "foo-stream")
    public void handle(MyData data, MyMetadata metadata) {
        System.out.println(data + ", " + metadata);
    }
}

See JavaListenerTest.java for an example.

Kotlin example:

@Service
class MyKinesisListener {

    @KinesisListener(stream = "foo-stream")
    override fun handle(data: MyData, metadata: MyMetadata) = println("$data, $metadata")
}

See KotlinListenerTest.kt for an example.

Consume as batch

To enable batch consuming, change the signature to Map instead of data and meta-data. The map will contain the data as key and the meta-data as value. Due to java generic and erasure the types of the data and meta-data must be explicitly set in the annotation. Two new fields have been added, dataClass and metaClass, the types must align with the map types.

Java example:

@Service
public class MyKinesisListener {

    @KinesisListener(stream = "foo-stream", dataClass = MyData.class, metaClass = MyMetadata.class)
    public void handle(Map<MyData, MyMetadata> events) {
        System.out.println(events.entrySet().stream()
                                            .map(e -> e.getKey() + ", " + e.getValue())
                                            .collect(joining()));
    }
}

See JavaListenerTest.java for an example.

Kotlin example:

@Service
class MyKinesisListener {

    @KinesisListener(stream = "foo-stream", dataClass = MyData::class, metaClass = MyMetadata::class)
    override fun handle(events: Map<MyData, MyMetadata>) = println(events.map("${it.key}, ${it.value}").joinToString())
}

See KotlinListenerTest.kt for an example.

Developer Guide

We're using the official Kotlin Style Guide to format our code. Follow the link below for more information and instructions on how to configure the IntelliJ formatter according to this style guide.

More:

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Kotlin 98.0%
  • Java 2.0%