Skip to content

Commit

Permalink
Merge pull request #3 from StreetContxt/tweak-stats
Browse files Browse the repository at this point in the history
Tweaked stats
  • Loading branch information
anikiforovopensource authored Jan 12, 2018
2 parents 7e5009d + cfd1f48 commit 5c40ec4
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 29 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ val TestAndIntegrationTest = "test,it"

organization in ThisBuild := "com.contxt"
scalaVersion in ThisBuild := "2.11.8"
version in ThisBuild := "1.0.0-SNAPSHOT"
version in ThisBuild := "1.0.1-SNAPSHOT"

val slf4j = "org.slf4j" % "slf4j-api" % "1.7.21"
val logback = "ch.qos.logback" % "logback-classic" % "1.2.3"
Expand Down
14 changes: 8 additions & 6 deletions src/it/scala/com/contxt/kinesis/MessageUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object MessageUtil {
* Sending a message batch can be retried, as long as the order of messages remains the same.
* Processing of messages can restart at an earlier checkpoint, as long as the order of messages remains the same. */
def dedupAndGroupByKey(keyMessagePairs: Seq[(String, String)]): Map[String, IndexedSeq[String]] = {
groupByKey(keyMessagePairs).mapValues(removeReprocessed)
groupByKey(keyMessagePairs).map { case (key, value) => key -> removeReprocessed(key, value) }
}

def groupByKey(keyMessagePairs: Seq[(String, String)]): Map[String, IndexedSeq[String]] = {
Expand All @@ -17,7 +17,7 @@ object MessageUtil {
}
}

private[kinesis] def removeReprocessed(messages: IndexedSeq[String]): IndexedSeq[String] = {
private[kinesis] def removeReprocessed(key: String, messages: IndexedSeq[String]): IndexedSeq[String] = {
def unwindRetry(sliceCandidate: IndexedSeq[String], from: Int): Int = {
var i = 0
while (from + i < messages.size && i < sliceCandidate.size && sliceCandidate(i) == messages(from + i)) i += 1
Expand All @@ -40,12 +40,12 @@ object MessageUtil {
val lastMessage = messages(j)
if (lastDistinct.isEmpty || lastDistinct.get != lastMessage) {
val restartedAt = distinct.lastIndexOf(lastMessage)
if (restartedAt < lastRestartedAt) throw new UnexpectedMessageSequence(lastMessage, messages)
if (restartedAt < lastRestartedAt) throw new UnexpectedMessageSequence(key, lastMessage, messages)
lastRestartedAt = restartedAt
val reprocessedSliceCandidate = distinct.slice(restartedAt, i)
val lastIndexOfRetrySequence = unwindRetries(reprocessedSliceCandidate, j) - 1
if (lastIndexOfRetrySequence < j || reprocessedSliceCandidate.last != messages(lastIndexOfRetrySequence)) {
throw new UnexpectedMessageSequence(lastMessage, messages)
throw new UnexpectedMessageSequence(key, lastMessage, messages)
}
j = lastIndexOfRetrySequence + 1
}
Expand All @@ -57,6 +57,8 @@ object MessageUtil {
distinct
}

private class UnexpectedMessageSequence(lastMessage: String, messages: IndexedSeq[String])
extends Exception(s"Messages starting from `$lastMessage` were processed out of order: ${messages.mkString(",")}")
private class UnexpectedMessageSequence(key: String, lastMessage: String, messages: IndexedSeq[String])
extends Exception(
s"Messages for key `$key` starting from `$lastMessage` were processed out of order: ${messages.mkString(",")}"
)
}
28 changes: 14 additions & 14 deletions src/it/scala/com/contxt/kinesis/MessageUtilTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class MessageUtilTest extends WordSpec with Matchers {

"removing reprocessed messages" should {
"keep the original sequence when there is no duplication" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3"
Expand All @@ -30,47 +30,47 @@ class MessageUtilTest extends WordSpec with Matchers {

"detect replay mismatch in the beginning" in {
an[Exception] should be thrownBy{
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m1", "m3"
))
}
}

"detect replay mismatch in the middle" in {
an[Exception] should be thrownBy{
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m2", "m4", "m5"
))
}
}

"detect replay mismatch at the end" in {
an[Exception] should be thrownBy{
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m2", "m4"
))
}
}

"detect reordering of messages in the beginning" in {
an[Exception] should be thrownBy{
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m2", "m1", "m3"
))
}
}

"detect reordering of messages in the middle" in {
an[Exception] should be thrownBy{
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m3", "m2", "m4"
))
}
}

"detect reordering of messages at the end" in {
an[Exception] should be thrownBy{
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m3", "m2"
))
}
Expand All @@ -79,23 +79,23 @@ class MessageUtilTest extends WordSpec with Matchers {

"removing single reprocessed message" should {
"handle repeated leading message" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m1", "m1", "m2", "m3"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3"
)
}

"handle repeated message in the middle" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m2", "m2", "m3"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3"
)
}

"handle repeated trailing message" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m3", "m3"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3"
Expand All @@ -105,23 +105,23 @@ class MessageUtilTest extends WordSpec with Matchers {

"removing a sequence of reprocessed messages" should {
"handle repeated leading sequence" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m1", "m2", "m3"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3"
)
}

"handle repeated sequence in the middle" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m2", "m3", "m4"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3", "m4"
)
}

"handle repeated trailing sequence" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m2", "m3"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3"
Expand All @@ -130,7 +130,7 @@ class MessageUtilTest extends WordSpec with Matchers {
}

"handle repeated retry sequences" in {
MessageUtil.removeReprocessed(IndexedSeq(
MessageUtil.removeReprocessed("key1", IndexedSeq(
"m1", "m2", "m3", "m2", "m2", "m3", "m4"
)) shouldEqual IndexedSeq(
"m1", "m2", "m3", "m4"
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/com/contxt/kinesis/KinesisSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ object KinesisSource {
.source[IndexedSeq[KinesisRecord]](perProducerBufferSize = 1)
.viaMat(KillSwitches.single)(Keep.both)
.watchTermination()(Keep.both)
.mapMaterializedValue { case ((mergeSink, streamKillSwitch), terminationFuture) =>
.mapMaterializedValue { case ((mergeSink, streamKillSwitch), streamTerminationFuture) =>
val processorFactory = new RecordProcessorFactoryImpl(
kinesisAppId,
streamKillSwitch, terminationFuture,
streamKillSwitch, streamTerminationFuture,
mergeSink,
shardCheckpointConfig, consumerStats
)
createAndStartKclWorker(workerFactory, processorFactory, kclConfig, streamKillSwitch, terminationFuture)
createAndStartKclWorker(workerFactory, processorFactory, kclConfig, streamKillSwitch, streamTerminationFuture)
}
.mapConcat(_.toIndexedSeq)
}
Expand Down Expand Up @@ -133,7 +133,7 @@ private[kinesis] class ManagedKinesisWorker(private val worker: Worker) extends
private[kinesis] class RecordProcessorFactoryImpl(
kinesisAppId: KinesisAppId,
streamKillSwitch: KillSwitch,
terminationFuture: Future[Done],
streamTerminationFuture: Future[Done],
mergeSink: Sink[IndexedSeq[KinesisRecord], NotUsed],
shardCheckpointConfig: ShardCheckpointConfig,
consumerStats: ConsumerStats
Expand All @@ -146,7 +146,7 @@ private[kinesis] class RecordProcessorFactoryImpl(

new RecordProcessorImpl(
kinesisAppId,
streamKillSwitch, terminationFuture,
streamKillSwitch, streamTerminationFuture,
queue,
shardCheckpointConfig, consumerStats
)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/com/contxt/kinesis/RecordProcessorImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[kinesis] class ShardCheckpointTracker(shardCheckpointConfig: ShardCheckp
private[kinesis] class RecordProcessorImpl(
kinesisAppId: KinesisAppId,
streamKillSwitch: KillSwitch,
terminationFuture: Future[Done],
streamTerminationFuture: Future[Done],
queue: SourceQueueWithComplete[IndexedSeq[KinesisRecord]],
shardCheckpointConfig: ShardCheckpointConfig,
consumerStats: ConsumerStats
Expand Down Expand Up @@ -206,7 +206,7 @@ private[kinesis] class RecordProcessorImpl(
import scala.concurrent.ExecutionContext.Implicits.global

val allCompletedOrTermination = Future.firstCompletedOf(Seq(
shardCheckpointTracker.allInFlightRecordsCompetedFuture, terminationFuture
shardCheckpointTracker.allInFlightRecordsCompetedFuture, streamTerminationFuture
))
Try(Await.result(allCompletedOrTermination, Duration.Inf))
}
Expand All @@ -215,7 +215,7 @@ private[kinesis] class RecordProcessorImpl(
import scala.concurrent.ExecutionContext.Implicits.global

val hasStreamFailed = {
if (terminationFuture.isCompleted) Try(Await.result(terminationFuture, 0.seconds)).isFailure
if (streamTerminationFuture.isCompleted) Try(Await.result(streamTerminationFuture, 0.seconds)).isFailure
else false
}
if (!hasStreamFailed) Try(Await.result(shardCheckpointTracker.allInFlightRecordsCompetedFuture, waitDuration))
Expand Down

0 comments on commit 5c40ec4

Please sign in to comment.