Skip to content

Commit

Permalink
WIP - Add Kinesis KCL Source
Browse files Browse the repository at this point in the history
  • Loading branch information
aserrallerios committed Aug 7, 2017
1 parent 78726bd commit 67bb52a
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.model.Record

class CommittableRecord(val shardId: String, val record: Record, checkpointer: IRecordProcessorCheckpointer) {
val sequenceNumber = record.getSequenceNumber
def checkpoint(): Unit = checkpointer.checkpoint(record)
}

object CommittableRecord {

implicit val order: Ordering[CommittableRecord] = Ordering.by(_.sequenceNumber)

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
*/
package akka.stream.alpakka.kinesis

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason

import scala.util.control.NoStackTrace

object KinesisSourceErrors {
object KinesisErrors {

sealed trait KinesisSourceError extends NoStackTrace
case object NoShardsError extends KinesisSourceError
case object GetShardIteratorError extends KinesisSourceError
case object GetRecordsError extends KinesisSourceError

sealed trait KinesisWorkerSourceErrors extends NoStackTrace
case class UnexpectedShutdown(e: ShutdownReason) extends KinesisWorkerSourceErrors

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.util.concurrent.Future

import akka.actor.ActorRef
import akka.stream.alpakka.kinesis.KinesisSourceStage._
import akka.stream.alpakka.kinesis.{KinesisSourceErrors => Errors}
import akka.stream.alpakka.kinesis.{KinesisErrors => Errors}
import akka.stream.stage.GraphStageLogic.StageActor
import akka.stream.stage._
import akka.stream.{Attributes, Outlet, SourceShape}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import scala.concurrent.duration._

case class KinesisWorkerSourceSettings(bufferSize: Int)
case class KinesisWorkerCheckpointSettings(maxBatchSize: Int, maxBatchWait: FiniteDuration)

object KinesisWorkerSourceSettings {

val defaultInstance = KinesisWorkerSourceSettings(1000)

}

object KinesisWorkerCheckpointSettings {

val defaultInstance = KinesisWorkerCheckpointSettings(1000, 10.seconds)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis

import java.util
import java.util.concurrent.Semaphore

import akka.stream.alpakka.kinesis.KinesisErrors.UnexpectedShutdown
import akka.stream.stage._
import akka.stream.{Attributes, Outlet, SourceShape}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{
IRecordProcessor,
IRecordProcessorCheckpointer,
IRecordProcessorFactory
}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{ShutdownReason, Worker}
import com.amazonaws.services.kinesis.model.Record

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}

class KinesisWorkerSourceStage(
bufferSize: Int,
workerBuilder: IRecordProcessorFactory => Worker,
workerExecutor: ExecutionContext
) extends GraphStageWithMaterializedValue[SourceShape[CommittableRecord], Worker] {

private val out: Outlet[CommittableRecord] = Outlet("records")
override val shape: SourceShape[CommittableRecord] = SourceShape(out)

override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Worker) = {
val logic = new Logic()
logic -> logic.worker
}

private class Logic extends GraphStageLogic(shape) with StageLogging {
private val buffer = new util.ArrayDeque[CommittableRecord]()
// We're transmitting backpressure to the worker with a semaphore instance
// worker callback ~> semaphore.acquire ~> push downstream ~> semaphore.release
private val semaphore = new Semaphore(bufferSize)

private[KinesisWorkerSourceStage] var worker: Worker = _

private def tryToProduce(): Unit =
if (!buffer.isEmpty && isAvailable(out)) {
push(out, buffer.poll())
semaphore.release()
}

override def preStart(): Unit = {
val callback = getAsyncCallback[CommittableRecord] { tuple =>
semaphore.acquire()
buffer.offer(tuple)
tryToProduce()
}
val shutdownCallback = getAsyncCallback[ShutdownReason](reason => failStage(UnexpectedShutdown(reason)))
worker = workerBuilder(
new IRecordProcessorFactory {
override def createProcessor(): IRecordProcessor = new IRecordProcessor {
var shardId: String = _
override def initialize(shardId: String): Unit =
this.shardId = shardId
override def processRecords(records: util.List[Record], checkpointer: IRecordProcessorCheckpointer): Unit =
records.asScala.foreach(r => callback.invoke(new CommittableRecord(shardId, r, checkpointer)))
override def shutdown(checkpointer: IRecordProcessorCheckpointer, reason: ShutdownReason): Unit =
shutdownCallback.invoke(reason)
}
}
)
Future(worker.run())(workerExecutor)
}

override def postStop(): Unit =
worker.shutdown()

setHandler(out, new OutHandler {
override def onPull(): Unit =
tryToProduce()
})
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package akka.stream.alpakka.kinesis.scaladsl

import akka.NotUsed
import akka.stream.alpakka.kinesis.KinesisSourceErrors.NoShardsError
import akka.stream.alpakka.kinesis.KinesisErrors.NoShardsError
import akka.stream.alpakka.kinesis.{KinesisSourceStage, ShardSettings}
import akka.stream.scaladsl.{Merge, Source}
import com.amazonaws.services.kinesis.AmazonKinesisAsync
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.kinesis.scaladsl

import akka.NotUsed
import akka.stream.alpakka.kinesis.{
CommittableRecord,
KinesisWorkerCheckpointSettings,
KinesisWorkerSourceSettings,
KinesisWorkerSourceStage
}
import akka.stream.scaladsl.{Flow, Source}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
import com.amazonaws.services.kinesis.model.Record

import scala.concurrent.{ExecutionContext, Future}

object KinesisWorkerSource {

def apply(
workerBuilder: IRecordProcessorFactory => Worker,
settings: KinesisWorkerSourceSettings = KinesisWorkerSourceSettings.defaultInstance
)(implicit workerExecutor: ExecutionContext): Source[CommittableRecord, Worker] =
Source.fromGraph(new KinesisWorkerSourceStage(settings.bufferSize, workerBuilder, workerExecutor))

def checkpointRecords(settings: KinesisWorkerCheckpointSettings = KinesisWorkerCheckpointSettings.defaultInstance)(
implicit checkpointExecutor: ExecutionContext
): Flow[CommittableRecord, Record, NotUsed] =
Flow[CommittableRecord]
.groupBy(MAX_KINESIS_SHARDS, _.shardId)
.groupedWithin(settings.maxBatchSize, settings.maxBatchWait)
.map(records => records -> records.max)
.mapAsync(1) {
case (records, highestRecord) =>
Future {
highestRecord.checkpoint()
records
}
}
.mapConcat(identity)
.map(_.record)
.mergeSubstreams

// http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html
private val MAX_KINESIS_SHARDS = 500

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import java.util
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicReference

import akka.stream.alpakka.kinesis.KinesisSourceErrors.NoShardsError
import akka.stream.alpakka.kinesis.scaladsl.KinesisSource
import akka.stream.testkit.scaladsl.TestSink
import akka.util.ByteString
Expand Down Expand Up @@ -130,7 +129,7 @@ class KinesisSourceSpec extends WordSpecLike with Matchers with DefaultTestConte
with WithGetRecordsFailure {
val probe = KinesisSource.basic(shardSettings, amazonKinesisAsync).runWith(TestSink.probe)
probe.request(1)
probe.expectError() shouldBe an[KinesisSourceErrors.GetRecordsError.type]
probe.expectError() shouldBe an[KinesisErrors.GetRecordsError.type]
}
}

Expand Down
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ object Dependencies {

val Kinesis = Seq(
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-kinesis" % "1.11.95", // ApacheV2
"com.amazonaws" % "aws-java-sdk-kinesis" % "1.11.172", // ApacheV2
"com.amazonaws" % "amazon-kinesis-client" % "1.8.1",
"org.mockito" % "mockito-core" % "2.7.11" % Test // MIT
)
)
Expand Down

0 comments on commit 67bb52a

Please sign in to comment.