Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle adapted java source providers for replay #1279

Merged
merged 2 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ package akka.projection.dynamodb

import java.time.Instant
import java.time.{ Duration => JDuration }
import java.util.Optional
import java.util.UUID
import java.util.concurrent.CompletionException
import java.util.concurrent.CompletionStage
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Supplier
import java.util.{ HashMap => JHashMap }

import scala.annotation.tailrec
Expand All @@ -20,7 +23,9 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters._
import scala.util.Failure
import scala.util.Success

Expand Down Expand Up @@ -159,6 +164,68 @@ object DynamoDBTimestampOffsetProjectionSpec {
}
}

class JavaTestTimestampSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
testSourceProvider: akka.projection.testkit.javadsl.TestSourceProvider[Offset, EventEnvelope[String]],
override val maxSlice: Int,
enableCurrentEventsByPersistenceId: Boolean)
extends akka.projection.javadsl.SourceProvider[Offset, EventEnvelope[String]]
with BySlicesSourceProvider
with akka.persistence.query.typed.javadsl.EventTimestampQuery
with akka.persistence.query.typed.javadsl.LoadEventQuery
with LoadEventsByPersistenceIdSourceProvider[String] {

override def source(offset: Supplier[CompletionStage[Optional[Offset]]])
: CompletionStage[akka.stream.javadsl.Source[EventEnvelope[String], NotUsed]] =
testSourceProvider.source(offset)

override def extractOffset(envelope: EventEnvelope[String]): Offset =
testSourceProvider.extractOffset(envelope)

override def extractCreationTime(envelope: EventEnvelope[String]): Long =
testSourceProvider.extractCreationTime(envelope)

override def minSlice: Int = 0

override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = {
Future
.successful(envelopes.collectFirst {
case env
if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr && env.offset
.isInstanceOf[TimestampOffset] =>
env.offset.asInstanceOf[TimestampOffset].timestamp
}.toJava)
.asJava
}

override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = {
envelopes.collectFirst {
case env if env.persistenceId == persistenceId && env.sequenceNr == sequenceNr =>
env.asInstanceOf[EventEnvelope[Event]]
} match {
case Some(env) => Future.successful(env).asJava
case None =>
Future
.failed(
new NoSuchElementException(
s"Event with persistenceId [$persistenceId] and sequenceNr [$sequenceNr] not found."))
.asJava
}
}

override private[akka] def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Option[Source[EventEnvelope[String], NotUsed]] = {
if (enableCurrentEventsByPersistenceId)
Some(Source(envelopes.filter { env =>
env.persistenceId == persistenceId && env.sequenceNr >= fromSequenceNr && env.sequenceNr <= toSequenceNr
}))
else
None
}
}

// test model is as simple as a text that gets other string concatenated to it
case class ConcatStr(id: String, text: String) {
def concat(newMsg: String): ConcatStr = {
Expand Down Expand Up @@ -326,6 +393,32 @@ class DynamoDBTimestampOffsetProjectionSpec
enableCurrentEventsByPersistenceId)
}

// envelopes are emitted by the "query" source, but allEnvelopes can be loaded
def createJavaSourceProviderWithMoreEnvelopes(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
allEnvelopes: immutable.IndexedSeq[EventEnvelope[String]],
enableCurrentEventsByPersistenceId: Boolean,
complete: Boolean = true): JavaTestTimestampSourceProvider = {
val sp =
akka.projection.testkit.javadsl.TestSourceProvider
.create[Offset, EventEnvelope[String]](akka.stream.javadsl.Source.from(envelopes.asJava), _.offset)
.withStartSourceFrom {
case (lastProcessedOffsetBySlice: TimestampOffsetBySlice, offset: TimestampOffset) =>
// FIXME: should have the envelope slice to handle this properly
val lastProcessedOffset = lastProcessedOffsetBySlice.offsets.head._2
offset.timestamp.isBefore(lastProcessedOffset.timestamp) ||
(offset.timestamp == lastProcessedOffset.timestamp && offset.seen == lastProcessedOffset.seen)
case _ => false
}
.withAllowCompletion(complete)

new JavaTestTimestampSourceProvider(
allEnvelopes,
sp,
persistenceExt.numberOfSlices - 1,
enableCurrentEventsByPersistenceId)
}

def createBacktrackingSourceProvider(
envelopes: immutable.IndexedSeq[EventEnvelope[String]],
complete: Boolean = true): TestTimestampSourceProvider = {
Expand Down Expand Up @@ -1884,6 +1977,129 @@ class DynamoDBTimestampOffsetProjectionSpec
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}

"replay rejected sequence numbers for at-least-once grouped (javadsl)" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val allEnvelopes = createEnvelopes(pid1, 10) ++ createEnvelopes(pid2, 3)
val skipPid1SeqNrs = Set(3L, 4L, 5L, 7L, 9L)
val envelopes = allEnvelopes.filterNot { env =>
(env.persistenceId == pid1 && skipPid1SeqNrs(env.sequenceNr)) ||
(env.persistenceId == pid2 && (env.sequenceNr == 1))
}

val sourceProvider =
createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val results = new ConcurrentHashMap[String, String]()

val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] =
(envelopes: java.util.List[EventEnvelope[String]]) => {
Future {
envelopes.asScala.foreach { envelope =>
results.putIfAbsent(envelope.persistenceId, "|")
results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|")
}
}.map(_ => Done.getInstance()).asJava
}

val projection =
akka.projection.dynamodb.javadsl.DynamoDBProjection
.atLeastOnceGroupedWithin(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava,
sourceProvider,
handler = () => handler,
system)
.withGroup(8, 3.seconds.toJava)

offsetShouldBeEmpty()

projectionTestKit.run(projection) {
results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|e10|"
results.get(pid2) shouldBe "|e1|e2|e3|"
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}

"replay rejected sequence numbers due to clock skew on event write for at-least-once grouped (javadsl)" in {
val pid1 = UUID.randomUUID().toString
val pid2 = UUID.randomUUID().toString
val projectionId = genRandomProjectionId()

val start = tick().instant()

def createEnvelopesFor(
pid: Pid,
fromSeqNr: Int,
toSeqNr: Int,
fromTimestamp: Instant): immutable.IndexedSeq[EventEnvelope[String]] = {
(fromSeqNr to toSeqNr).map { n =>
createEnvelope(pid, n, fromTimestamp.plusSeconds(n - fromSeqNr), s"e$n")
}
}

val envelopes1 =
createEnvelopesFor(pid1, 1, 2, start) ++
createEnvelopesFor(pid1, 3, 4, start.plusSeconds(4)) ++ // gap
createEnvelopesFor(pid1, 5, 9, start.plusSeconds(2)) // clock skew, back 2, and then overlapping

val envelopes2 =
createEnvelopesFor(pid2, 1, 3, start.plusSeconds(10)) ++
createEnvelopesFor(pid2, 4, 6, start.plusSeconds(1)) ++ // clock skew, back 9
createEnvelopesFor(pid2, 7, 9, start.plusSeconds(20)) // and gap

val allEnvelopes = envelopes1 ++ envelopes2

val envelopes = allEnvelopes.sortBy(_.timestamp)

val sourceProvider =
createJavaSourceProviderWithMoreEnvelopes(envelopes, allEnvelopes, enableCurrentEventsByPersistenceId = true)

implicit val offsetStore: DynamoDBOffsetStore =
new DynamoDBOffsetStore(projectionId, Some(sourceProvider), system, settings, client)

val results = new ConcurrentHashMap[String, String]()

val handler: akka.projection.javadsl.Handler[java.util.List[EventEnvelope[String]]] =
(envelopes: java.util.List[EventEnvelope[String]]) => {
Future {
envelopes.asScala.foreach { envelope =>
results.putIfAbsent(envelope.persistenceId, "|")
results.computeIfPresent(envelope.persistenceId, (_, value) => value + envelope.event + "|")
}
}.map(_ => Done.getInstance()).asJava
}

val projection =
akka.projection.dynamodb.javadsl.DynamoDBProjection
.atLeastOnceGroupedWithin(
projectionId,
Some(settings.withReplayOnRejectedSequenceNumbers(true)).toJava,
sourceProvider,
handler = () => handler,
system)
.withGroup(2, 3.seconds.toJava)

offsetShouldBeEmpty()

projectionTestKit.run(projection) {
results.get(pid1) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|"
results.get(pid2) shouldBe "|e1|e2|e3|e4|e5|e6|e7|e8|e9|"
}

eventually {
latestOffsetShouldBe(allEnvelopes.last.offset)
}
}
}

"A DynamoDB flow projection with TimestampOffset" must {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import akka.projection.internal.GroupedHandlerStrategy
import akka.projection.internal.HandlerStrategy
import akka.projection.internal.InternalProjection
import akka.projection.internal.InternalProjectionState
import akka.projection.internal.JavaToScalaBySliceSourceProviderAdapter
import akka.projection.internal.ManagementState
import akka.projection.internal.OffsetStoredByHandler
import akka.projection.internal.OffsetStrategy
Expand Down Expand Up @@ -218,7 +219,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -329,7 +334,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -478,7 +487,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -631,7 +644,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down Expand Up @@ -713,7 +730,11 @@ private[projection] object DynamoDBProjectionImpl {
val logPrefix = offsetStore.logPrefix
originalEnvelope match {
case originalEventEnvelope: EventEnvelope[Any @unchecked] if originalEventEnvelope.sequenceNr > 1 =>
sourceProvider match {
val underlyingProvider = sourceProvider match {
case adapted: JavaToScalaBySliceSourceProviderAdapter[_, _] => adapted.delegate
case provider => provider
}
underlyingProvider match {
case provider: LoadEventsByPersistenceIdSourceProvider[Any @unchecked]
if offsetStore.settings.replayOnRejectedSequenceNumbers =>
val persistenceId = originalEventEnvelope.persistenceId
Expand Down