Skip to content

Commit

Permalink
Merge pull request #4 from StreetContxt/more-stats
Browse files Browse the repository at this point in the history
More Stats, Minor fixes
  • Loading branch information
agenovese authored Jan 24, 2018
2 parents 5c40ec4 + 29362e9 commit 958a83f
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 52 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ val TestAndIntegrationTest = "test,it"

organization in ThisBuild := "com.contxt"
scalaVersion in ThisBuild := "2.11.8"
version in ThisBuild := "1.0.1-SNAPSHOT"
version in ThisBuild := "1.0.2-SNAPSHOT"

val slf4j = "org.slf4j" % "slf4j-api" % "1.7.21"
val logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
val amazonKinesisClient = "com.amazonaws" % "amazon-kinesis-client" % "1.8.8"
val scalaKinesisProducer = "com.contxt" %% "kpl-scala" % "1.0.0-SNAPSHOT"
val scalaKinesisProducer = "com.contxt" %% "kpl-scala" % "1.0.2-SNAPSHOT"
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.1"
val scalaMock = "org.scalamock" %% "scalamock-scalatest-support" % "3.6.0"
val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.5.6"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ com.contxt.kinesis {

shard-checkpoint-config {
checkpoint-period = 60 seconds
checkpoint-after-completing-nr-of-records = 10000
checkpoint-after-processing-nr-of-records = 10000
max-wait-for-completion-on-stream-shutdown = 5 seconds
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/com/contxt/kinesis/ConsumerStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ trait ConsumerStats {
shardConsumerId: ShardConsumerId, batchSize: Int
)(closure: => Future[QueueOfferResult]): Future[QueueOfferResult]

def recordNrOfInFlightRecords(shardConsumerId: ShardConsumerId, totalCount: Int): Unit
def recordNrOfProcessedUncheckpointedRecords(shardConsumerId: ShardConsumerId, totalCount: Int): Unit

def reportInitialization(shardConsumerId: ShardConsumerId): Unit
def reportShutdown(shardConsumerId: ShardConsumerId, reason: ShutdownReason): Unit
}
Expand Down Expand Up @@ -50,6 +53,9 @@ class NoopConsumerStats extends ConsumerStats {
shardConsumerId: ShardConsumerId, batchSize: Int
)(closure: => Future[QueueOfferResult]): Future[QueueOfferResult] = closure

def recordNrOfInFlightRecords(shardConsumerId: ShardConsumerId, totalCount: Int): Unit = {}
def recordNrOfProcessedUncheckpointedRecords(shardConsumerId: ShardConsumerId, totalCount: Int): Unit = {}

def reportInitialization(shardConsumerId: ShardConsumerId): Unit = {}
def reportShutdown(shardConsumerId: ShardConsumerId, reason: ShutdownReason): Unit = {}
}
70 changes: 44 additions & 26 deletions src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,61 @@ import scala.util.control.NonFatal
import scala.collection.JavaConversions._

private[kinesis] class ShardCheckpointTracker(shardCheckpointConfig: ShardCheckpointConfig) {
private var inFlightRecords = Queue.empty[KinesisRecord]
private var unprocessedInFlightRecords = Queue.empty[KinesisRecord]
private var lastCheckpointedAt = ZonedDateTime.now()
private var lastCompletedButNotCheckpointed = Option.empty[KinesisRecord]
private var completedButNotCheckpointedCount = 0
private var lastProcessedButNotCheckpointed = Option.empty[KinesisRecord]
private var processedButNotCheckpointedCount = 0

def nrOfInFlightRecords: Int = {
unprocessedInFlightRecords.size + processedButNotCheckpointedCount
}
def nrOfProcessedUncheckpointedRecords: Int = {
popProcessedRecords()
processedButNotCheckpointedCount
}

def watchForCompletion(records: Iterable[KinesisRecord]): Unit = {
inFlightRecords ++= records
unprocessedInFlightRecords ++= records
}

def shouldCheckpoint: Boolean = {
popCompletedRecords()
popProcessedRecords()

completedButNotCheckpointedCount >= shardCheckpointConfig.checkpointAfterCompletingNrOfRecords ||
processedButNotCheckpointedCount >= shardCheckpointConfig.checkpointAfterProcessingNrOfRecords ||
durationSinceLastCheckpoint() >= shardCheckpointConfig.checkpointPeriod
}

def checkpointLastCompletedRecord(checkpointLogic: KinesisRecord => Unit): Unit = {
popCompletedRecords()
def checkpointLastProcessedRecord(checkpointLogic: KinesisRecord => Unit): Unit = {
popProcessedRecords()

lastCompletedButNotCheckpointed.foreach { kinesisRecord =>
lastProcessedButNotCheckpointed.foreach { kinesisRecord =>
try {
checkpointLogic(kinesisRecord)
lastCompletedButNotCheckpointed = None
lastProcessedButNotCheckpointed = None
}
finally {
clearCheckpointTriggers()
}
}
}

def allInFlightRecordsCompeted: Boolean = inFlightRecords.forall(_.completionFuture.isCompleted)
def allInFlightRecordsProcessed: Boolean = unprocessedInFlightRecords.forall(_.completionFuture.isCompleted)

def allInFlightRecordsCompetedFuture(implicit ec: ExecutionContext): Future[Done] = {
def allInFlightRecordsProcessedFuture(implicit ec: ExecutionContext): Future[Done] = {
Future
.sequence(inFlightRecords.map(_.completionFuture))
.sequence(unprocessedInFlightRecords.map(_.completionFuture))
.map(_ => Done)
}

private def popCompletedRecords(): Unit = {
val completedRecords = inFlightRecords.takeWhile(_.completionFuture.isCompleted)
inFlightRecords = inFlightRecords.drop(completedRecords.size)
completedButNotCheckpointedCount += completedRecords.size
lastCompletedButNotCheckpointed = completedRecords.lastOption.orElse(lastCompletedButNotCheckpointed)
private def popProcessedRecords(): Unit = {
val processedRecords = unprocessedInFlightRecords.takeWhile(_.completionFuture.isCompleted)
unprocessedInFlightRecords = unprocessedInFlightRecords.drop(processedRecords.size)
processedButNotCheckpointedCount += processedRecords.size
lastProcessedButNotCheckpointed = processedRecords.lastOption.orElse(lastProcessedButNotCheckpointed)
}

private def clearCheckpointTriggers(): Unit = {
completedButNotCheckpointedCount = 0
processedButNotCheckpointedCount = 0
lastCheckpointedAt = ZonedDateTime.now()
}

Expand Down Expand Up @@ -99,8 +107,9 @@ private[kinesis] class RecordProcessorImpl(
val records = processRecordsInput.getRecords.toIndexedSeq
val kinesisRecords = records.map(KinesisRecord.fromMutableRecord)
shardCheckpointTracker.watchForCompletion(kinesisRecords)
if (kinesisRecords.nonEmpty) blockToEnqueueAndHandleResult(kinesisRecords)
recordCheckpointerStats()
if (shardCheckpointTracker.shouldCheckpoint) checkpointAndHandleErrors(processRecordsInput.getCheckpointer)
if (kinesisRecords.nonEmpty) blockToEnqueueAndHandleResult(kinesisRecords)
}
catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -170,15 +179,15 @@ private[kinesis] class RecordProcessorImpl(

private def checkpointAndHandleErrors(checkpointer: IRecordProcessorCheckpointer, shardEnd: Boolean = false): Unit = {
try {
if (shardEnd && shardCheckpointTracker.allInFlightRecordsCompeted) {
if (shardEnd && shardCheckpointTracker.allInFlightRecordsProcessed) {
// Checkpointing the actual offset is not enough. Instead, we are required to use the checkpoint()
// method without arguments, which is not covered by Kinesis documentation.
checkpointer.checkpoint()
consumerStats.checkpointShardEndAcked(shardConsumerId)
log.info(s"Successfully checkpointed $shardConsumerId at SHARD_END.")
}
else {
shardCheckpointTracker.checkpointLastCompletedRecord { kinesisRecord =>
shardCheckpointTracker.checkpointLastProcessedRecord { kinesisRecord =>
val seqNumber = kinesisRecord.sequenceNumber
val subSeqNumber = kinesisRecord.subSequenceNumber.getOrElse(0L)
checkpointer.checkpoint(seqNumber, subSeqNumber)
Expand All @@ -202,13 +211,22 @@ private[kinesis] class RecordProcessorImpl(
}
}

private def recordCheckpointerStats(): Unit = {
consumerStats.recordNrOfInFlightRecords(
shardConsumerId, shardCheckpointTracker.nrOfInFlightRecords
)
consumerStats.recordNrOfProcessedUncheckpointedRecords(
shardConsumerId, shardCheckpointTracker.nrOfProcessedUncheckpointedRecords
)
}

private def waitForInFlightRecordsOrTermination(): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global

val allCompletedOrTermination = Future.firstCompletedOf(Seq(
shardCheckpointTracker.allInFlightRecordsCompetedFuture, streamTerminationFuture
val allProcessedOrTermination = Future.firstCompletedOf(Seq(
shardCheckpointTracker.allInFlightRecordsProcessedFuture, streamTerminationFuture
))
Try(Await.result(allCompletedOrTermination, Duration.Inf))
Try(Await.result(allProcessedOrTermination, Duration.Inf))
}

private def waitForInFlightRecordsUnlessStreamFailed(waitDuration: Duration): Unit = {
Expand All @@ -218,7 +236,7 @@ private[kinesis] class RecordProcessorImpl(
if (streamTerminationFuture.isCompleted) Try(Await.result(streamTerminationFuture, 0.seconds)).isFailure
else false
}
if (!hasStreamFailed) Try(Await.result(shardCheckpointTracker.allInFlightRecordsCompetedFuture, waitDuration))
if (!hasStreamFailed) Try(Await.result(shardCheckpointTracker.allInFlightRecordsProcessedFuture, waitDuration))
}

private def getOffsetString(n: ExtendedSequenceNumber): String = {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/contxt/kinesis/ShardCheckpointConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import scala.concurrent.duration._

case class ShardCheckpointConfig(
checkpointPeriod: Duration,
checkpointAfterCompletingNrOfRecords: Int,
checkpointAfterProcessingNrOfRecords: Int,
maxWaitForCompletionOnStreamShutdown: Duration
)

Expand All @@ -14,7 +14,7 @@ object ShardCheckpointConfig {
val localConfig = config.getConfig("com.contxt.kinesis.consumer.shard-checkpoint-config")
ShardCheckpointConfig(
checkpointPeriod = localConfig.getDuration("checkpoint-period").toMillis.millis,
checkpointAfterCompletingNrOfRecords = localConfig.getInt("checkpoint-after-completing-nr-of-records"),
checkpointAfterProcessingNrOfRecords = localConfig.getInt("checkpoint-after-processing-nr-of-records"),
maxWaitForCompletionOnStreamShutdown = localConfig.getDuration("max-wait-for-completion-on-stream-shutdown").toMillis.millis
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class KinesisSourceFactoryTest

private val shardCheckpointConfig = ShardCheckpointConfig(
checkpointPeriod = 1.minutes,
checkpointAfterCompletingNrOfRecords = 10000,
checkpointAfterProcessingNrOfRecords = 10000,
maxWaitForCompletionOnStreamShutdown = 4.seconds
)

Expand Down
40 changes: 20 additions & 20 deletions src/test/scala/com/contxt/kinesis/ShardCheckpointTrackerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ShardCheckpointTrackerTest extends WordSpec with Matchers {

"checkpoint on target record count" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(checkpoinConfig.checkpointAfterCompletingNrOfRecords)
val records = mkRecrods(checkpoinConfig.checkpointAfterProcessingNrOfRecords)
tracker.watchForCompletion(records)
records.foreach(_.markProcessed())
tracker.shouldCheckpoint shouldBe true
Expand All @@ -37,27 +37,27 @@ class ShardCheckpointTrackerTest extends WordSpec with Matchers {
}
}

"checkpointing last completed record" should {
"do nothing if no completed records" in {
"checkpointing last processed record" should {
"do nothing if no processed records" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(1)
tracker.watchForCompletion(records)

var checkpointedRecord = Option.empty[KinesisRecord]
tracker.checkpointLastCompletedRecord { record =>
tracker.checkpointLastProcessedRecord { record =>
checkpointedRecord = Some(record)
}
checkpointedRecord shouldBe None
}

"checkpoint the last completed record in a sequence" in {
"checkpoint the last processed record in a sequence" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(3)
tracker.watchForCompletion(records)
records.take(2).foreach(_.markProcessed())

var checkpointedRecord = Option.empty[KinesisRecord]
tracker.checkpointLastCompletedRecord { record =>
tracker.checkpointLastProcessedRecord { record =>
checkpointedRecord = Some(record)
}
checkpointedRecord shouldBe Some(records(1))
Expand All @@ -70,7 +70,7 @@ class ShardCheckpointTrackerTest extends WordSpec with Matchers {
Seq(records(0), records(1), records(3)).foreach(_.markProcessed())

var checkpointedRecord = Option.empty[KinesisRecord]
tracker.checkpointLastCompletedRecord { record =>
tracker.checkpointLastProcessedRecord { record =>
checkpointedRecord = Some(record)
}
checkpointedRecord shouldBe Some(records(1))
Expand All @@ -84,72 +84,72 @@ class ShardCheckpointTrackerTest extends WordSpec with Matchers {

tracker.shouldCheckpoint shouldBe true
a[TestException] shouldBe thrownBy {
tracker.checkpointLastCompletedRecord { record =>
tracker.checkpointLastProcessedRecord { record =>
throw new TestException
}
}
tracker.shouldCheckpoint shouldBe false
}

"rethrow an exception and keep the last completed record" in {
"rethrow an exception and keep the last processed record" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(2)
tracker.watchForCompletion(records)
records.foreach(_.markProcessed())

var checkpointedRecord1 = Option.empty[KinesisRecord]
a[TestException] shouldBe thrownBy {
tracker.checkpointLastCompletedRecord { record =>
tracker.checkpointLastProcessedRecord { record =>
checkpointedRecord1 = Some(record)
throw new TestException
}
}
checkpointedRecord1 shouldBe Some(records(1))

var checkpointedRecord2 = Option.empty[KinesisRecord]
tracker.checkpointLastCompletedRecord { record =>
tracker.checkpointLastProcessedRecord { record =>
checkpointedRecord2 = Some(record)
}
checkpointedRecord2 shouldBe Some(records(1))
}
}

"checking for in flight record completion" should {
"return true if all records are completed" in {
"return true if all records are processed" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(3)
tracker.watchForCompletion(records)
records.foreach(_.markProcessed())
tracker.allInFlightRecordsCompeted shouldBe true
tracker.allInFlightRecordsProcessed shouldBe true
}

"return false if at least one record is not completed" in {
"return false if at least one record is not processed" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(3)
tracker.watchForCompletion(records)
Seq(records(0), records(2)).foreach(_.markProcessed())
tracker.allInFlightRecordsCompeted shouldBe false
tracker.allInFlightRecordsProcessed shouldBe false
}
}

"creating in flight record completion future" should {
"return future that completes when all records complete" in {
"return a future that completes when all the records are processed" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(3)
tracker.watchForCompletion(records)
records.foreach(_.markProcessed())
import scala.concurrent.ExecutionContext.Implicits.global
Await.result(tracker.allInFlightRecordsCompetedFuture, completionFutureAwaitDuration) shouldBe Done
Await.result(tracker.allInFlightRecordsProcessedFuture, completionFutureAwaitDuration) shouldBe Done
}

"return future that wont complete as long as one record is incomplete" in {
"return a future that wont complete as long as a record remains unprocessed" in {
val tracker = mkCheckpointTracker()
val records = mkRecrods(3)
tracker.watchForCompletion(records)
Seq(records(0), records(2)).foreach(_.markProcessed())
a[TimeoutException] shouldBe thrownBy {
import scala.concurrent.ExecutionContext.Implicits.global
Await.result(tracker.allInFlightRecordsCompetedFuture, completionFutureAwaitDuration)
Await.result(tracker.allInFlightRecordsProcessedFuture, completionFutureAwaitDuration)
}
}
}
Expand All @@ -159,7 +159,7 @@ class ShardCheckpointTrackerTest extends WordSpec with Matchers {

private val checkpoinConfig = ShardCheckpointConfig(
checkpointPeriod = 2.second,
checkpointAfterCompletingNrOfRecords = 2,
checkpointAfterProcessingNrOfRecords = 2,
maxWaitForCompletionOnStreamShutdown = 2.second
)
private def mkCheckpointTracker() = new ShardCheckpointTracker(checkpoinConfig)
Expand Down

0 comments on commit 958a83f

Please sign in to comment.