From f8f9d2e73776f327d6f5a9cb99b0ed8f1cdaa9a9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 18 Nov 2024 09:19:59 +0100 Subject: [PATCH] fix: DynamoDB evict offsets per slice * don't use evicted state when saving changed timestamp per slice * evict time window for each slice * remove keep-number-of-entries and evict-interval --- .../DynamoDBOffsetStoreStateSpec.scala | 58 +++---- .../DynamoDBTimestampOffsetStoreSpec.scala | 142 ++++++++---------- .../src/main/resources/reference.conf | 8 - .../dynamodb/DynamoDBProjectionSettings.scala | 21 ++- .../internal/DynamoDBOffsetStore.scala | 135 +++++++++-------- 5 files changed, 173 insertions(+), 191 deletions(-) diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala index 13c3e3f60..334869f19 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBOffsetStoreStateSpec.scala @@ -4,6 +4,7 @@ package akka.projection.dynamodb +import java.time.{ Duration => JDuration } import java.time.Instant import akka.persistence.query.TimestampOffset @@ -35,7 +36,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match state1.byPid("p1").seqNr shouldBe 3L state1.offsetBySlice(slice("p1")) shouldBe TimestampOffset(t0.plusMillis(2), Map("p1" -> 3L)) state1.latestTimestamp shouldBe t0.plusMillis(2) - state1.oldestTimestamp shouldBe t0 val state2 = state1.add(Vector(createRecord("p2", 2, t0.plusMillis(1)))) state2.byPid("p1").seqNr shouldBe 3L @@ -44,7 +44,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match state2.offsetBySlice(slice("p2")) shouldBe TimestampOffset(t0.plusMillis(1), Map("p2" -> 2L)) // latest not updated because timestamp of p2 was before latest state2.latestTimestamp shouldBe t0.plusMillis(2) - state2.oldestTimestamp shouldBe t0 val state3 = state2.add(Vector(createRecord("p3", 10, t0.plusMillis(3)))) state3.byPid("p1").seqNr shouldBe 3L @@ -54,7 +53,6 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match slice("p3") should not be slice("p2") state3.offsetBySlice(slice("p3")) shouldBe TimestampOffset(t0.plusMillis(3), Map("p3" -> 10L)) state3.latestTimestamp shouldBe t0.plusMillis(3) - state3.oldestTimestamp shouldBe t0 // same slice and same timestamp, keep both in seen slice("p10084") shouldBe slice("p3") @@ -63,37 +61,45 @@ class DynamoDBOffsetStoreStateSpec extends AnyWordSpec with TestSuite with Match } "evict old" in { - // these pids have the same slice 645, otherwise it will keep one for each slice - val p1 = "p500" - val p2 = "p621" - val p3 = "p742" - val p4 = "p863" - val p5 = "p984" + val p1 = "p500" // slice 645 + val p2 = "p621" // slice 645 + val p3 = "p742" // slice 645 + val p4 = "p863" // slice 645 + val p5 = "p984" // slice 645 + val p6 = "p92" // slice 905 + val p7 = "p108" // slice 905 val t0 = TestClock.nowMillis().instant() val state1 = State.empty .add( Vector( - createRecord(p1, 1, t0), - createRecord(p2, 2, t0.plusMillis(1)), - createRecord(p3, 3, t0.plusMillis(2)), - createRecord(p4, 4, t0.plusMillis(3)), - createRecord(p5, 5, t0.plusMillis(4)))) - state1.oldestTimestamp shouldBe t0 + createRecord(p1, 1, t0.plusMillis(1)), + createRecord(p2, 2, t0.plusMillis(2)), + createRecord(p3, 3, t0.plusMillis(3)), + createRecord(p4, 4, t0.plusMillis(4)), + createRecord(p6, 6, t0.plusMillis(6)))) state1.byPid - .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L) - - val state2 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 1) - state2.oldestTimestamp shouldBe t0.plusMillis(2) - state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p3 -> 3L, p4 -> 4L, p5 -> 5L) + .map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p1 -> 1L, p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) // keep all - state1.evict(t0.plusMillis(2), keepNumberOfEntries = 100) shouldBe state1 - - // keep 4 - val state3 = state1.evict(t0.plusMillis(2), keepNumberOfEntries = 4) - state3.oldestTimestamp shouldBe t0.plusMillis(1) - state3.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p5 -> 5L) + state1.evict(slice = 645, timeWindow = JDuration.ofMillis(1000)) shouldBe state1 + + // evict older than time window + val state2 = state1.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + state2.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p2 -> 2L, p3 -> 3L, p4 -> 4L, p6 -> 6L) + + val state3 = state1.add(Vector(createRecord(p5, 5, t0.plusMillis(100)), createRecord(p7, 7, t0.plusMillis(10)))) + val state4 = state3.evict(slice = 645, timeWindow = JDuration.ofMillis(2)) + state4.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map(p5 -> 5L, p6 -> 6L, p7 -> 7L) + + val state5 = state3.evict(slice = 905, timeWindow = JDuration.ofMillis(2)) + state5.byPid.map { case (pid, r) => pid -> r.seqNr } shouldBe Map( + p1 -> 1L, + p2 -> 2L, + p3 -> 3L, + p4 -> 4L, + p5 -> 5L, + p7 -> 7L) } "find duplicate" in { diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala index c148ca620..b0d705b1d 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala @@ -33,6 +33,7 @@ import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.Duplicat import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.RejectedBacktrackingSeqNr import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Validation.RejectedSeqNr import akka.projection.dynamodb.internal.OffsetPidSeqNr +import akka.projection.dynamodb.internal.OffsetStoreDao import akka.projection.dynamodb.internal.OffsetStoreDao.OffsetStoreAttributes import akka.projection.internal.ManagementState import com.typesafe.config.Config @@ -42,13 +43,7 @@ import org.scalatest.wordspec.AnyWordSpecLike import org.slf4j.LoggerFactory object DynamoDBTimestampOffsetStoreSpec { - val config: Config = - ConfigFactory - .parseString(""" - # to be able to test eviction - akka.projection.dynamodb.offset-store.keep-number-of-entries = 0 - """) - .withFallback(TestConfig.config) + val config: Config = TestConfig.config def configWithOffsetTTL: Config = ConfigFactory @@ -808,7 +803,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) "evict old records from same slice" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) @@ -827,34 +822,22 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)), - p4, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)), - p5, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)), - p6, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L)) .futureValue offsetStore.getState().size shouldBe 6 @@ -864,81 +847,59 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) offsetStore.getState().size shouldBe 7 // nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)), - p8, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.getState().size shouldBe 8 // still nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)), - p8, - 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8) offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)), - p8, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) } "evict old records from different slices" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) val startTime = TestClock.nowMicros().instant() log.debug("Start time [{}]", startTime) - val p1 = "p500" // slice 645 - val p2 = "p92" // slice 905 - val p3 = "p108" // slice 905 - val p4 = "p863" // slice 645 - val p5 = "p984" // slice 645 - val p6 = "p3080" // slice 645 - val p7 = "p4290" // slice 645 - val p8 = "p20180" // slice 645 + // these pids have the same slice 645 + val p1 = "p500" + val p2 = "p621" + val p3 = "p742" + val p4 = "p863" + val p5 = "p984" + val p6 = "p3080" + val p7 = "p4290" + val p8 = "p20180" + val p9 = "p-0960" // slice 576 offsetStore.saveOffset(OffsetPidSeqNr(TimestampOffset(startTime, Map(p1 -> 1L)), p1, 1L)).futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(1)), Map(p2 -> 1L)), p2, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(1), Map(p2 -> 1L)), p2, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(JDuration.ofSeconds(2)), Map(p3 -> 1L)), p3, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(2), Map(p3 -> 1L)), p3, 1L)) .futureValue offsetStore - .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(evictInterval), Map(p4 -> 1L)), p4, 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(3), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(1)), Map(p4 -> 1L)), - p4, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p4 -> 1L)), p4, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(2)), Map(p5 -> 1L)), - p5, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(5), Map(p5 -> 1L)), p5, 1L)) .futureValue offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(evictInterval).plus(JDuration.ofSeconds(3)), Map(p6 -> 1L)), - p6, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusSeconds(4), Map(p6 -> 1L)), p6, 3L)) .futureValue offsetStore.getState().size shouldBe 6 @@ -948,31 +909,46 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) offsetStore.getState().size shouldBe 7 // nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).minusSeconds(3)), Map(p8 -> 1L)), - p8, - 1L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.minusSeconds(1)), Map(p8 -> 1L)), p8, 1L)) .futureValue offsetStore.getState().size shouldBe 8 // still nothing evicted yet offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(1)), Map(p8 -> 2L)), - p8, - 2L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(4)), Map(p8 -> 2L)), p8, 2L)) .futureValue offsetStore.getState().byPid.keySet shouldBe Set(p5, p6, p7, p8) offsetStore - .saveOffset( - OffsetPidSeqNr( - TimestampOffset(startTime.plus(timeWindow.plus(evictInterval).plusSeconds(20)), Map(p8 -> 3L)), - p8, - 3L)) + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plus(timeWindow.plusSeconds(20)), Map(p8 -> 3L)), p8, 3L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) + + // save same slice, but behind + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1001), Map(p2 -> 2L)), p2, 2L)) .futureValue + // it's evicted immediately offsetStore.getState().byPid.keySet shouldBe Set(p7, p8) + val dao = new OffsetStoreDao(system, settings, projectionId, client) + // but still saved + dao.loadSequenceNumber(slice(p2), p2).futureValue.get.seqNr shouldBe 2 + // the timestamp was earlier than previously used for this slice, and therefore stored timestamp not changed + dao.loadTimestampOffset(slice(p2)).futureValue.get.timestamp shouldBe startTime.plus(timeWindow.plusSeconds(20)) + + // save another slice that hasn't been used before + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1002), Map(p9 -> 1L)), p9, 1L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8) + dao.loadSequenceNumber(slice(p9), p9).futureValue.get.seqNr shouldBe 1 + dao.loadTimestampOffset(slice(p9)).futureValue.get.timestamp shouldBe startTime.plusMillis(1002) + // and one more of that same slice + offsetStore + .saveOffset(OffsetPidSeqNr(TimestampOffset(startTime.plusMillis(1003), Map(p9 -> 2L)), p9, 2L)) + .futureValue + offsetStore.getState().byPid.keySet shouldBe Set(p9, p7, p8) + dao.loadSequenceNumber(slice(p9), p9).futureValue.get.seqNr shouldBe 2 + dao.loadTimestampOffset(slice(p9)).futureValue.get.timestamp shouldBe startTime.plusMillis(1003) } "start from slice offset" in { diff --git a/akka-projection-dynamodb/src/main/resources/reference.conf b/akka-projection-dynamodb/src/main/resources/reference.conf index 5fcf8ae32..3eb4f1a63 100644 --- a/akka-projection-dynamodb/src/main/resources/reference.conf +++ b/akka-projection-dynamodb/src/main/resources/reference.conf @@ -12,14 +12,6 @@ akka.projection.dynamodb { # within this time window from latest offset. time-window = 5 minutes - # Keep this number of entries. Don't evict old entries until this threshold - # has been reached. - keep-number-of-entries = 10000 - - # Remove old entries outside the time-window from the offset store memory - # with this frequency. - evict-interval = 10 seconds - # Trying to batch insert offsets in batches of this size. offset-batch-size = 20 diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala index 2c07be1d6..82dc79268 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/DynamoDBProjectionSettings.scala @@ -6,6 +6,7 @@ package akka.projection.dynamodb import java.time.{ Duration => JDuration } +import scala.annotation.nowarn import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.jdk.DurationConverters._ @@ -40,8 +41,8 @@ object DynamoDBProjectionSettings { timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"), useClient = config.getString("use-client"), timeWindow = config.getDuration("offset-store.time-window"), - keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"), - evictInterval = config.getDuration("offset-store.evict-interval"), + keepNumberOfEntries = 0, + evictInterval = JDuration.ZERO, warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"), offsetBatchSize = config.getInt("offset-store.offset-batch-size"), offsetSliceReadParallelism = config.getInt("offset-store.offset-slice-read-parallelism"), @@ -60,7 +61,9 @@ final class DynamoDBProjectionSettings private ( val timestampOffsetTable: String, val useClient: String, val timeWindow: JDuration, + @deprecated("Not used, evict is only based on time window", "1.6.2") val keepNumberOfEntries: Int, + @deprecated("Not used, evict is not periodic", "1.6.2") val evictInterval: JDuration, val warnAboutFilteredEventsInFlow: Boolean, val offsetBatchSize: Int, @@ -79,14 +82,17 @@ final class DynamoDBProjectionSettings private ( def withTimeWindow(timeWindow: JDuration): DynamoDBProjectionSettings = copy(timeWindow = timeWindow) + @deprecated("Not used, evict is only based on time window", "1.6.2") def withKeepNumberOfEntries(keepNumberOfEntries: Int): DynamoDBProjectionSettings = - copy(keepNumberOfEntries = keepNumberOfEntries) + this + @deprecated("Not used, evict is not periodic", "1.6.2") def withEvictInterval(evictInterval: FiniteDuration): DynamoDBProjectionSettings = - copy(evictInterval = evictInterval.toJava) + this + @deprecated("Not used, evict is not periodic", "1.6.2") def withEvictInterval(evictInterval: JDuration): DynamoDBProjectionSettings = - copy(evictInterval = evictInterval) + this def withWarnAboutFilteredEventsInFlow(warnAboutFilteredEventsInFlow: Boolean): DynamoDBProjectionSettings = copy(warnAboutFilteredEventsInFlow = warnAboutFilteredEventsInFlow) @@ -100,12 +106,11 @@ final class DynamoDBProjectionSettings private ( def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings = copy(timeToLiveSettings = timeToLiveSettings) + @nowarn("msg=deprecated") private def copy( timestampOffsetTable: String = timestampOffsetTable, useClient: String = useClient, timeWindow: JDuration = timeWindow, - keepNumberOfEntries: Int = keepNumberOfEntries, - evictInterval: JDuration = evictInterval, warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow, offsetBatchSize: Int = offsetBatchSize, offsetSliceReadParallelism: Int = offsetSliceReadParallelism, @@ -122,7 +127,7 @@ final class DynamoDBProjectionSettings private ( timeToLiveSettings) override def toString = - s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $keepNumberOfEntries, $evictInterval, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" + s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $warnAboutFilteredEventsInFlow, $offsetBatchSize)" } object TimeToLiveSettings { diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index 41e1da16d..9219891ca 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -9,6 +9,7 @@ import java.time.{ Duration => JDuration } import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec +import scala.collection.immutable.TreeSet import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -43,7 +44,26 @@ private[projection] object DynamoDBOffsetStore { type SeqNr = Long type Pid = String - final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) + final case class Record(slice: Int, pid: Pid, seqNr: SeqNr, timestamp: Instant) extends Ordered[Record] { + + override def compare(that: Record): Int = { + val result = this.timestamp.compareTo(that.timestamp) + if (result == 0) { + if (this.slice == that.slice) + if (this.pid == that.pid) + if (this.seqNr == that.seqNr) + 0 + else + java.lang.Long.compare(this.seqNr, that.seqNr) + else + this.pid.compareTo(that.pid) + else Integer.compare(this.slice, that.slice) + } else { + result + } + } + } + final case class RecordWithOffset( record: Record, offset: TimestampOffset, @@ -53,19 +73,18 @@ private[projection] object DynamoDBOffsetStore { fromSnapshot: Boolean) object State { - val empty: State = State(Map.empty, Map.empty, Instant.EPOCH, 0) + val empty: State = State(Map.empty, Map.empty, Map.empty) def apply(offsetBySlice: Map[Int, TimestampOffset]): State = if (offsetBySlice.isEmpty) empty - else new State(Map.empty, offsetBySlice, Instant.EPOCH, 0) + else new State(Map.empty, Map.empty, offsetBySlice) } final case class State( byPid: Map[Pid, Record], - offsetBySlice: Map[Int, TimestampOffset], - oldestTimestamp: Instant, - sizeAfterEvict: Int) { + bySliceSorted: Map[Int, TreeSet[Record]], + offsetBySlice: Map[Int, TimestampOffset]) { def size: Int = byPid.size @@ -77,7 +96,7 @@ private[projection] object DynamoDBOffsetStore { if (offsetBySlice.isEmpty) TimestampOffset.Zero else offsetBySlice.valuesIterator.maxBy(_.timestamp) - def add(records: IndexedSeq[Record]): State = { + def add(records: Iterable[Record]): State = { records.foldLeft(this) { case (acc, r) => val newByPid = @@ -91,6 +110,12 @@ private[projection] object DynamoDBOffsetStore { acc.byPid.updated(r.pid, r) } + val newBySliceSorted = + acc.bySliceSorted.updated(r.slice, acc.bySliceSorted.get(r.slice) match { + case Some(existing) => existing + r + case None => TreeSet.empty[Record] + r + }) + val newOffsetBySlice = acc.offsetBySlice.get(r.slice) match { case Some(existing) => @@ -105,15 +130,7 @@ private[projection] object DynamoDBOffsetStore { acc.offsetBySlice.updated(r.slice, TimestampOffset(r.timestamp, Map(r.pid -> r.seqNr))) } - val newOldestTimestamp = - if (acc.oldestTimestamp == Instant.EPOCH) - r.timestamp // first record - else if (r.timestamp.isBefore(acc.oldestTimestamp)) - r.timestamp - else - acc.oldestTimestamp // this is the normal case - - acc.copy(byPid = newByPid, offsetBySlice = newOffsetBySlice, oldestTimestamp = newOldestTimestamp) + acc.copy(byPid = newByPid, bySliceSorted = newBySliceSorted, offsetBySlice = newOffsetBySlice) } } @@ -127,21 +144,22 @@ private[projection] object DynamoDBOffsetStore { } } - def window: JDuration = - JDuration.between(oldestTimestamp, latestTimestamp) - - private lazy val sortedByTimestamp: Vector[Record] = byPid.valuesIterator.toVector.sortBy(_.timestamp) - - def evict(until: Instant, keepNumberOfEntries: Int): State = { - if (oldestTimestamp.isBefore(until) && size > keepNumberOfEntries) { - val newState = State(offsetBySlice).add( - sortedByTimestamp - .take(size - keepNumberOfEntries) - .filterNot(_.timestamp.isBefore(until)) ++ sortedByTimestamp - .takeRight(keepNumberOfEntries)) - newState.copy(sizeAfterEvict = newState.size) - } else + def evict(slice: Int, timeWindow: JDuration): State = { + val recordsSortedByTimestamp = bySliceSorted.getOrElse(slice, TreeSet.empty[Record]) + if (recordsSortedByTimestamp.isEmpty) { this + } else { + val until = recordsSortedByTimestamp.last.timestamp.minus(timeWindow) + val filtered = recordsSortedByTimestamp.dropWhile(_.timestamp.isBefore(until)) + if (filtered.size == recordsSortedByTimestamp.size) { + this + } else { + val byPidOtherSlices = byPid.filterNot { case (_, r) => r.slice == slice } + val bySliceOtherSlices = bySliceSorted - slice + copy(byPid = byPidOtherSlices, bySliceSorted = bySliceOtherSlices) + .add(filtered) + } + } } } @@ -190,8 +208,6 @@ private[projection] class DynamoDBOffsetStore( private val dao = new OffsetStoreDao(system, settings, projectionId, client) - private val evictWindow = settings.timeWindow.plus(settings.evictInterval) - private[projection] implicit val executionContext: ExecutionContext = system.executionContext // The OffsetStore instance is used by a single projectionId and there shouldn't be many concurrent @@ -268,12 +284,7 @@ private[projection] class DynamoDBOffsetStore( offsetBySliceFut.map { offsetBySlice => val newState = State(offsetBySlice) - logger.debug( - "{} readTimestampOffset state with [{}] persistenceIds, oldest [{}], latest [{}]", - logPrefix, - newState.byPid.size, - newState.oldestTimestamp, - newState.latestTimestamp) + if (!state.compareAndSet(oldState, newState)) throw new IllegalStateException("Unexpected concurrent modification of state from readOffset.") clearInflight() @@ -283,10 +294,10 @@ private[projection] class DynamoDBOffsetStore( } else { if (logger.isDebugEnabled) logger.debug( - "{} readTimestampOffset earliest slice [{}], latest slice [{}]", + "{} readTimestampOffset state with [{}] persistenceIds, timestamp per slice [{}]", logPrefix, - offsetBySlice.minBy(_._1), - offsetBySlice.maxBy(_._1)) + newState.byPid.size, + offsetBySlice.iterator.map { case (slice, offset) => s"$slice -> ${offset.timestamp}" }.mkString(", ")) TimestampOffsetBySlice(offsetBySlice) } @@ -392,37 +403,29 @@ private[projection] class DynamoDBOffsetStore( } else { val newState = oldState.add(filteredRecords) - // accumulate some more than the timeWindow before evicting, and at least 10% increase of size - // for testing keepNumberOfEntries = 0 is used - val evictThresholdReached = - if (settings.keepNumberOfEntries == 0) true else newState.size > (newState.sizeAfterEvict * 1.1).toInt - val evictedNewState = - if (newState.size > settings.keepNumberOfEntries && evictThresholdReached && newState.window - .compareTo(evictWindow) > 0) { - // FIXME maybe this should take the slice into account - val evictUntil = newState.latestTimestamp.minus(settings.timeWindow) - val s = newState.evict(evictUntil, settings.keepNumberOfEntries) - logger.debug( - "{} Evicted [{}] records until [{}], keeping [{}] records. Latest [{}].", - logPrefix, - newState.size - s.size, - evictUntil, - s.size, - newState.latestTimestamp) - s - } else - newState + val slices = + if (filteredRecords.size == 1) Set(filteredRecords.head.slice) + else filteredRecords.iterator.map(_.slice).toSet + + val evictedNewState = slices.foldLeft(newState) { + case (s, slice) => s.evict(slice, settings.timeWindow) + } // FIXME we probably don't have to store the latest offset per slice all the time, but can // accumulate some changes and flush on size/time. - val changedOffsetBySlice = evictedNewState.offsetBySlice.filter { - case (slice, offset) => - offset != oldState.offsetBySlice.getOrElse(slice, TimestampOffset.Zero) - } + val changedOffsetBySlice = slices.flatMap { slice => + val newOffset = newState.offsetBySlice(slice) + val oldOffset = oldState.offsetBySlice.getOrElse(slice, TimestampOffset.Zero) + if (newOffset.timestamp.isBefore(oldOffset.timestamp)) None + else Some(slice -> newOffset) + }.toMap storeSequenceNumbers(filteredRecords).flatMap { _ => val storeOffsetsResult = - if (changedOffsetBySlice.isEmpty) FutureDone else dao.storeTimestampOffsets(changedOffsetBySlice) + if (changedOffsetBySlice.isEmpty) + FutureDone + else + dao.storeTimestampOffsets(changedOffsetBySlice) storeOffsetsResult.flatMap { _ => if (state.compareAndSet(oldState, evictedNewState)) { cleanupInflight(evictedNewState)