From cfd1f481b00ad2833ae09e56dc0417f8bd4b836b Mon Sep 17 00:00:00 2001 From: Aleksey Nikiforov Date: Thu, 11 Jan 2018 17:42:56 -0500 Subject: [PATCH] Tweaked stats, fixed test, renamed a val. --- build.sbt | 2 +- .../com/contxt/kinesis/MessageUtil.scala | 14 ++++++---- .../com/contxt/kinesis/MessageUtilTest.scala | 28 +++++++++---------- .../com/contxt/kinesis/KinesisSource.scala | 10 +++---- .../contxt/kinesis/RecordProcessorImpl.scala | 6 ++-- 5 files changed, 31 insertions(+), 29 deletions(-) diff --git a/build.sbt b/build.sbt index e552960..bd6ec6e 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ val TestAndIntegrationTest = "test,it" organization in ThisBuild := "com.contxt" scalaVersion in ThisBuild := "2.11.8" -version in ThisBuild := "1.0.0-SNAPSHOT" +version in ThisBuild := "1.0.1-SNAPSHOT" val slf4j = "org.slf4j" % "slf4j-api" % "1.7.21" val logback = "ch.qos.logback" % "logback-classic" % "1.2.3" diff --git a/src/it/scala/com/contxt/kinesis/MessageUtil.scala b/src/it/scala/com/contxt/kinesis/MessageUtil.scala index f617807..fb39e12 100644 --- a/src/it/scala/com/contxt/kinesis/MessageUtil.scala +++ b/src/it/scala/com/contxt/kinesis/MessageUtil.scala @@ -6,7 +6,7 @@ object MessageUtil { * Sending a message batch can be retried, as long as the order of messages remains the same. * Processing of messages can restart at an earlier checkpoint, as long as the order of messages remains the same. */ def dedupAndGroupByKey(keyMessagePairs: Seq[(String, String)]): Map[String, IndexedSeq[String]] = { - groupByKey(keyMessagePairs).mapValues(removeReprocessed) + groupByKey(keyMessagePairs).map { case (key, value) => key -> removeReprocessed(key, value) } } def groupByKey(keyMessagePairs: Seq[(String, String)]): Map[String, IndexedSeq[String]] = { @@ -17,7 +17,7 @@ object MessageUtil { } } - private[kinesis] def removeReprocessed(messages: IndexedSeq[String]): IndexedSeq[String] = { + private[kinesis] def removeReprocessed(key: String, messages: IndexedSeq[String]): IndexedSeq[String] = { def unwindRetry(sliceCandidate: IndexedSeq[String], from: Int): Int = { var i = 0 while (from + i < messages.size && i < sliceCandidate.size && sliceCandidate(i) == messages(from + i)) i += 1 @@ -40,12 +40,12 @@ object MessageUtil { val lastMessage = messages(j) if (lastDistinct.isEmpty || lastDistinct.get != lastMessage) { val restartedAt = distinct.lastIndexOf(lastMessage) - if (restartedAt < lastRestartedAt) throw new UnexpectedMessageSequence(lastMessage, messages) + if (restartedAt < lastRestartedAt) throw new UnexpectedMessageSequence(key, lastMessage, messages) lastRestartedAt = restartedAt val reprocessedSliceCandidate = distinct.slice(restartedAt, i) val lastIndexOfRetrySequence = unwindRetries(reprocessedSliceCandidate, j) - 1 if (lastIndexOfRetrySequence < j || reprocessedSliceCandidate.last != messages(lastIndexOfRetrySequence)) { - throw new UnexpectedMessageSequence(lastMessage, messages) + throw new UnexpectedMessageSequence(key, lastMessage, messages) } j = lastIndexOfRetrySequence + 1 } @@ -57,6 +57,8 @@ object MessageUtil { distinct } - private class UnexpectedMessageSequence(lastMessage: String, messages: IndexedSeq[String]) - extends Exception(s"Messages starting from `$lastMessage` were processed out of order: ${messages.mkString(",")}") + private class UnexpectedMessageSequence(key: String, lastMessage: String, messages: IndexedSeq[String]) + extends Exception( + s"Messages for key `$key` starting from `$lastMessage` were processed out of order: ${messages.mkString(",")}" + ) } diff --git a/src/it/scala/com/contxt/kinesis/MessageUtilTest.scala b/src/it/scala/com/contxt/kinesis/MessageUtilTest.scala index 59a3175..8bf9dd1 100644 --- a/src/it/scala/com/contxt/kinesis/MessageUtilTest.scala +++ b/src/it/scala/com/contxt/kinesis/MessageUtilTest.scala @@ -21,7 +21,7 @@ class MessageUtilTest extends WordSpec with Matchers { "removing reprocessed messages" should { "keep the original sequence when there is no duplication" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3" )) shouldEqual IndexedSeq( "m1", "m2", "m3" @@ -30,7 +30,7 @@ class MessageUtilTest extends WordSpec with Matchers { "detect replay mismatch in the beginning" in { an[Exception] should be thrownBy{ - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m1", "m3" )) } @@ -38,7 +38,7 @@ class MessageUtilTest extends WordSpec with Matchers { "detect replay mismatch in the middle" in { an[Exception] should be thrownBy{ - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m2", "m4", "m5" )) } @@ -46,7 +46,7 @@ class MessageUtilTest extends WordSpec with Matchers { "detect replay mismatch at the end" in { an[Exception] should be thrownBy{ - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m2", "m4" )) } @@ -54,7 +54,7 @@ class MessageUtilTest extends WordSpec with Matchers { "detect reordering of messages in the beginning" in { an[Exception] should be thrownBy{ - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m2", "m1", "m3" )) } @@ -62,7 +62,7 @@ class MessageUtilTest extends WordSpec with Matchers { "detect reordering of messages in the middle" in { an[Exception] should be thrownBy{ - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m3", "m2", "m4" )) } @@ -70,7 +70,7 @@ class MessageUtilTest extends WordSpec with Matchers { "detect reordering of messages at the end" in { an[Exception] should be thrownBy{ - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m3", "m2" )) } @@ -79,7 +79,7 @@ class MessageUtilTest extends WordSpec with Matchers { "removing single reprocessed message" should { "handle repeated leading message" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m1", "m1", "m2", "m3" )) shouldEqual IndexedSeq( "m1", "m2", "m3" @@ -87,7 +87,7 @@ class MessageUtilTest extends WordSpec with Matchers { } "handle repeated message in the middle" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m2", "m2", "m3" )) shouldEqual IndexedSeq( "m1", "m2", "m3" @@ -95,7 +95,7 @@ class MessageUtilTest extends WordSpec with Matchers { } "handle repeated trailing message" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m3", "m3" )) shouldEqual IndexedSeq( "m1", "m2", "m3" @@ -105,7 +105,7 @@ class MessageUtilTest extends WordSpec with Matchers { "removing a sequence of reprocessed messages" should { "handle repeated leading sequence" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m1", "m2", "m3" )) shouldEqual IndexedSeq( "m1", "m2", "m3" @@ -113,7 +113,7 @@ class MessageUtilTest extends WordSpec with Matchers { } "handle repeated sequence in the middle" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m2", "m3", "m4" )) shouldEqual IndexedSeq( "m1", "m2", "m3", "m4" @@ -121,7 +121,7 @@ class MessageUtilTest extends WordSpec with Matchers { } "handle repeated trailing sequence" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m2", "m3" )) shouldEqual IndexedSeq( "m1", "m2", "m3" @@ -130,7 +130,7 @@ class MessageUtilTest extends WordSpec with Matchers { } "handle repeated retry sequences" in { - MessageUtil.removeReprocessed(IndexedSeq( + MessageUtil.removeReprocessed("key1", IndexedSeq( "m1", "m2", "m3", "m2", "m2", "m3", "m4" )) shouldEqual IndexedSeq( "m1", "m2", "m3", "m4" diff --git a/src/main/scala/com/contxt/kinesis/KinesisSource.scala b/src/main/scala/com/contxt/kinesis/KinesisSource.scala index 60d358b..95b93a1 100644 --- a/src/main/scala/com/contxt/kinesis/KinesisSource.scala +++ b/src/main/scala/com/contxt/kinesis/KinesisSource.scala @@ -69,14 +69,14 @@ object KinesisSource { .source[IndexedSeq[KinesisRecord]](perProducerBufferSize = 1) .viaMat(KillSwitches.single)(Keep.both) .watchTermination()(Keep.both) - .mapMaterializedValue { case ((mergeSink, streamKillSwitch), terminationFuture) => + .mapMaterializedValue { case ((mergeSink, streamKillSwitch), streamTerminationFuture) => val processorFactory = new RecordProcessorFactoryImpl( kinesisAppId, - streamKillSwitch, terminationFuture, + streamKillSwitch, streamTerminationFuture, mergeSink, shardCheckpointConfig, consumerStats ) - createAndStartKclWorker(workerFactory, processorFactory, kclConfig, streamKillSwitch, terminationFuture) + createAndStartKclWorker(workerFactory, processorFactory, kclConfig, streamKillSwitch, streamTerminationFuture) } .mapConcat(_.toIndexedSeq) } @@ -133,7 +133,7 @@ private[kinesis] class ManagedKinesisWorker(private val worker: Worker) extends private[kinesis] class RecordProcessorFactoryImpl( kinesisAppId: KinesisAppId, streamKillSwitch: KillSwitch, - terminationFuture: Future[Done], + streamTerminationFuture: Future[Done], mergeSink: Sink[IndexedSeq[KinesisRecord], NotUsed], shardCheckpointConfig: ShardCheckpointConfig, consumerStats: ConsumerStats @@ -146,7 +146,7 @@ private[kinesis] class RecordProcessorFactoryImpl( new RecordProcessorImpl( kinesisAppId, - streamKillSwitch, terminationFuture, + streamKillSwitch, streamTerminationFuture, queue, shardCheckpointConfig, consumerStats ) diff --git a/src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala b/src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala index d295e29..8dd1d37 100644 --- a/src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala +++ b/src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala @@ -76,7 +76,7 @@ private[kinesis] class ShardCheckpointTracker(shardCheckpointConfig: ShardCheckp private[kinesis] class RecordProcessorImpl( kinesisAppId: KinesisAppId, streamKillSwitch: KillSwitch, - terminationFuture: Future[Done], + streamTerminationFuture: Future[Done], queue: SourceQueueWithComplete[IndexedSeq[KinesisRecord]], shardCheckpointConfig: ShardCheckpointConfig, consumerStats: ConsumerStats @@ -206,7 +206,7 @@ private[kinesis] class RecordProcessorImpl( import scala.concurrent.ExecutionContext.Implicits.global val allCompletedOrTermination = Future.firstCompletedOf(Seq( - shardCheckpointTracker.allInFlightRecordsCompetedFuture, terminationFuture + shardCheckpointTracker.allInFlightRecordsCompetedFuture, streamTerminationFuture )) Try(Await.result(allCompletedOrTermination, Duration.Inf)) } @@ -215,7 +215,7 @@ private[kinesis] class RecordProcessorImpl( import scala.concurrent.ExecutionContext.Implicits.global val hasStreamFailed = { - if (terminationFuture.isCompleted) Try(Await.result(terminationFuture, 0.seconds)).isFailure + if (streamTerminationFuture.isCompleted) Try(Await.result(streamTerminationFuture, 0.seconds)).isFailure else false } if (!hasStreamFailed) Try(Await.result(shardCheckpointTracker.allInFlightRecordsCompetedFuture, waitDuration))