Skip to content

Commit

Permalink
WIP - Add Kinesis KCL Source
Browse files Browse the repository at this point in the history
  • Loading branch information
aserrallerios committed Oct 24, 2017
1 parent 31e5f0f commit bd82289
Show file tree
Hide file tree
Showing 13 changed files with 632 additions and 8 deletions.
43 changes: 40 additions & 3 deletions docs/src/main/paradox/kinesis.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# AWS Kinesis Connector

The AWS Kinesis connector provides an Akka Stream Source for consuming Kinesis Stream records.
The AWS Kinesis connector provides Akka Stream Sources for consuming and producing Kinesis Stream records.

For more information about Kinesis please visit the [official documentation](https://aws.amazon.com/documentation/kinesis/).

Expand Down Expand Up @@ -66,5 +66,42 @@ Scala
Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #list }

The constructed `Source` will return [Record](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html)
objects by calling [GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) at the specified interval and according to the downstream demand.
The constructed `Source` will return [Record](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html) objects by calling [GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) at the specified interval and according to the downstream demand.

# AWS KCL Worker Source & checkpointer

The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.

For more information about KCL please visit the [official documentation](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html).

## Usage

The KCL Worker Source needs to create and manage Worker instances in order to consume records from Kinesis Streams.

In order to use it, you need to provide a Worker builder and the Source settings:

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #worker-settings }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #worker-settings }

The Source also needs an `ExecutionContext` to run the Worker's thread and to execute record checkpoints. Then the Source can be created as usual:

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #worker-source }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #worker-source }

## Committing records

The KCL Worker Source publishes messages downstream that can be committed in order to mark progression of consumers by shard. This process can be done manually or using the provided checkpointer Flow.

In order to use the Flow you can provide additional settings:

Scala
: @@snip (../../../../kinesis/src/test/scala/akka/stream/alpakka/kinesis/scaladsl/Examples.scala) { #checkpoint }

Java
: @@snip (../../../../kinesis/src/test/java/akka/stream/alpakka/kinesis/javadsl/Examples.java) { #checkpoint }
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import scala.util.control.NoStackTrace

object KinesisErrors {

sealed trait KinesisSourceError extends NoStackTrace
case object NoShardsError extends KinesisSourceError
case object GetShardIteratorError extends KinesisSourceError
case object GetRecordsError extends KinesisSourceError

sealed trait KinesisWorkerSourceError extends NoStackTrace
case object WorkerUnexpectedShutdown extends KinesisWorkerSourceError

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import scala.concurrent.duration._

case class KinesisWorkerSourceSettings(bufferSize: Int, checkWorkerPeriodicity: FiniteDuration) {
require(
bufferSize >= 1,
"Buffer size must be greater than 0; use size 1 to disable stage buffering"
)
}
case class KinesisWorkerCheckpointSettings(maxBatchSize: Int, maxBatchWait: FiniteDuration) {
require(
maxBatchSize >= 1,
"Batch size must be greater than 0; use size 1 to commit records one at a time"
)
}

object KinesisWorkerSourceSettings {

val defaultInstance = KinesisWorkerSourceSettings(1000, 1.minute)

/**
* Java API
*/
def create(bufferSize: Int, checkWorkerPeriodicity: FiniteDuration): KinesisWorkerSourceSettings =
KinesisWorkerSourceSettings(bufferSize, checkWorkerPeriodicity)

}

object KinesisWorkerCheckpointSettings {

val defaultInstance = KinesisWorkerCheckpointSettings(1000, 10.seconds)

/**
* Java API
*/
def create(maxBatchSize: Int, maxBatchWait: FiniteDuration): KinesisWorkerCheckpointSettings =
KinesisWorkerCheckpointSettings(maxBatchSize, maxBatchWait)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import java.util
import java.util.concurrent.Semaphore

import akka.stream.alpakka.kinesis.KinesisErrors.WorkerUnexpectedShutdown
import akka.stream.alpakka.kinesis.worker.{CommittableRecord, IRecordProcessor}
import akka.stream.stage._
import akka.stream.{Attributes, Outlet, SourceShape}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

class KinesisWorkerSourceStage(
bufferSize: Int,
checkWorkerPeriodicity: FiniteDuration,
workerBuilder: IRecordProcessorFactory => Worker
)(implicit executor: ExecutionContext)
extends GraphStage[SourceShape[CommittableRecord]] {

private val out: Outlet[CommittableRecord] = Outlet("records")
override val shape: SourceShape[CommittableRecord] = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with StageLogging {
private val buffer = new util.ArrayDeque[CommittableRecord]()
// We're transmitting backpressure to the worker with a semaphore instance
// semaphore.acquire ~> callback ~> push downstream ~> semaphore.release
private val semaphore = new Semaphore(bufferSize)
private var hasBeenShutdown = false

private[KinesisWorkerSourceStage] var worker: Worker = _

private def tryToProduce(): Unit =
if (!buffer.isEmpty && isAvailable(out)) {
push(out, buffer.poll())
semaphore.release()
}

override protected def onTimer(timerKey: Any): Unit =
if (worker.hasGracefulShutdownStarted && !hasBeenShutdown) {
failStage(WorkerUnexpectedShutdown)
}

override def preStart(): Unit = {
val callback = getAsyncCallback[CommittableRecord] { record =>
buffer.offer(record)
tryToProduce()
}
worker = workerBuilder(
new IRecordProcessorFactory {
override def createProcessor(): IRecordProcessor =
new IRecordProcessor(record => {
semaphore.acquire()
callback.invoke(record)
})
}
)
schedulePeriodically("check-worker-shutdown", checkWorkerPeriodicity)
executor.execute(worker)
}

override def postStop(): Unit = {
hasBeenShutdown = true
buffer.clear()
Future(worker.shutdown())
}

setHandler(out, new OutHandler {
override def onPull(): Unit =
tryToProduce()
})
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.javadsl

import java.util.concurrent.Executor

import akka.NotUsed
import akka.stream.alpakka.kinesis.worker.CommittableRecord
import akka.stream.alpakka.kinesis.{scaladsl, _}
import akka.stream.javadsl.{Flow, Sink, Source}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.model.Record

import scala.concurrent.ExecutionContext

object KinesisWorker {

abstract class WorkerBuilder {
def build(r: IRecordProcessorFactory): Worker
}

def create(
workerBuilder: WorkerBuilder,
settings: KinesisWorkerSourceSettings,
workerExecutor: Executor
): Source[CommittableRecord, NotUsed] =
scaladsl.KinesisWorker
.apply(workerBuilder.build, settings)(ExecutionContext.fromExecutor(workerExecutor))
.asJava

def create(
workerBuilder: WorkerBuilder,
workerExecutor: Executor
): Source[CommittableRecord, NotUsed] =
create(workerBuilder, KinesisWorkerSourceSettings.defaultInstance, workerExecutor)

def checkpointRecordsFlow(
settings: KinesisWorkerCheckpointSettings
): Flow[CommittableRecord, Record, NotUsed] =
scaladsl.KinesisWorker.checkpointRecordsFlow(settings).asJava

def checkpointRecordsFlow(): Flow[CommittableRecord, Record, NotUsed] =
checkpointRecordsFlow(KinesisWorkerCheckpointSettings.defaultInstance)

def checkpointRecordsSink(
settings: KinesisWorkerCheckpointSettings
): Sink[CommittableRecord, NotUsed] =
scaladsl.KinesisWorker.checkpointRecordsSink(settings).asJava

def checkpointRecordsSink(): Sink[CommittableRecord, NotUsed] =
checkpointRecordsSink(KinesisWorkerCheckpointSettings.defaultInstance)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.scaladsl

import akka.NotUsed
import akka.stream.Supervision.{Resume, Stop}
import akka.stream.alpakka.kinesis.worker.CommittableRecord
import akka.stream.alpakka.kinesis.{
KinesisWorkerCheckpointSettings,
KinesisWorkerSourceSettings,
KinesisWorkerSourceStage
}
import akka.stream.{scaladsl, ActorAttributes, FlowShape}
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source, Zip}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.model.Record

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, Future}

object KinesisWorker {

def apply(
workerBuilder: IRecordProcessorFactory => Worker,
settings: KinesisWorkerSourceSettings = KinesisWorkerSourceSettings.defaultInstance
)(implicit workerExecutor: ExecutionContext): Source[CommittableRecord, NotUsed] =
Source.fromGraph(new KinesisWorkerSourceStage(settings.bufferSize, settings.checkWorkerPeriodicity, workerBuilder))

def checkpointRecordsFlow(
settings: KinesisWorkerCheckpointSettings = KinesisWorkerCheckpointSettings.defaultInstance
): Flow[CommittableRecord, Record, NotUsed] =
Flow[CommittableRecord]
.groupBy(MAX_KINESIS_SHARDS, _.shardId)
.groupedWithin(settings.maxBatchSize, settings.maxBatchWait)
.via(GraphDSL.create() { implicit b
import GraphDSL.Implicits._

val `{` = b.add(scaladsl.Broadcast[immutable.Seq[CommittableRecord]](2))
val `}` = b.add(Zip[Unit, immutable.Seq[CommittableRecord]])
val `=` = b.add(Flow[Record])

`{`.out(0)
.map(_.max)
.mapAsync(1)(r => if (r.canBeCheckpointed()) r.checkpoint() else Future.successful(())) ~> `}`.in0
`{`.out(1) ~> `}`.in1

`}`.out.map(_._2).mapConcat(identity).map(_.record) ~> `=`

FlowShape(`{`.in, `=`.out)
})
.mergeSubstreams
.withAttributes(ActorAttributes.supervisionStrategy {
case _: com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException => Resume
case _ => Stop
})

def checkpointRecordsSink(
settings: KinesisWorkerCheckpointSettings = KinesisWorkerCheckpointSettings.defaultInstance
): Sink[CommittableRecord, NotUsed] =
checkpointRecordsFlow(settings).to(Sink.ignore)

// http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
private val MAX_KINESIS_SHARDS = 500

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.worker

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.model.Record

import scala.concurrent.{ExecutionContext, Future}

class CommittableRecord(
val shardId: String,
val recordProcessorStartingSequenceNumber: ExtendedSequenceNumber,
val millisBehindLatest: Long,
val record: Record,
recordProcessor: IRecordProcessor,
checkpointer: IRecordProcessorCheckpointer
)(implicit executor: ExecutionContext) {

val sequenceNumber: String = record.getSequenceNumber

def recordProcessorShutdownReason(): Option[ShutdownReason] =
recordProcessor.shutdown
def canBeCheckpointed(): Boolean =
recordProcessorShutdownReason().isEmpty
def checkpoint(): Future[Unit] =
Future(checkpointer.checkpoint(record))

}

object CommittableRecord {

implicit val orderBySequenceNumber: Ordering[CommittableRecord] = Ordering.by(_.sequenceNumber)

}
Loading

0 comments on commit bd82289

Please sign in to comment.