Skip to content

Commit

Permalink
feat: Cleanup events by time (#495)
Browse files Browse the repository at this point in the history
* This can be useful for `DurableStateBehavior` with change events, where the
  events are only used for the Projections and not for the recovery of the
  `DurableStateBehavior` state. The timestamp may correspond to the the
  offset timestamp of the Projections, if events are not needed after all
  Projections have processed them.
  • Loading branch information
patriknw authored Dec 19, 2023
1 parent b5c077e commit bbf5224
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.r2dbc.cleanup.javadsl

import java.time.Instant
import java.util.concurrent.CompletionStage
import java.util.{ List => JList }

Expand Down Expand Up @@ -64,6 +65,44 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup)
def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] =
delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).toJava

/**
* Delete events before a timestamp for the given persistence id. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param persistenceId
* the persistence id to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(persistenceId: String, timestamp: Instant): CompletionStage[Done] =
delegate.deleteEventsBefore(persistenceId, timestamp).toJava

/**
* Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param entityType
* the entity type to delete for
* @param slice
* the slice to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): CompletionStage[Done] =
delegate.deleteEventsBefore(entityType, slice, timestamp).toJava

/**
* Delete snapshots related to one single `persistenceId`. Events are not deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.r2dbc.cleanup.scaladsl

import java.time.Instant

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Failure
Expand Down Expand Up @@ -92,6 +94,48 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
foreach(persistenceIds, "deleteAllEvents", pid => deleteAllEvents(pid, resetSequenceNumber))
}

/**
* Delete events before a timestamp for the given persistence id. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param persistenceId
* the persistence id to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Done] = {
log.debug("deleteEventsBefore persistenceId [{}], timestamp [{}]", persistenceId, timestamp)
journalDao.deleteEventsBefore(persistenceId, timestamp).map(_ => Done)
}

/**
* Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param entityType
* the entity type to delete for
* @param slice
* the slice to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Done] = {
log.debug("deleteEventsBefore [{}], slice [{}] timestamp [{}]", entityType, slice, timestamp)
journalDao.deleteEventsBefore(entityType, slice, timestamp).map(_ => Done)
}

/**
* Delete snapshots related to one single `persistenceId`. Events are not deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,8 @@ private[r2dbc] trait JournalDao {

def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit]

def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit]

def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit]

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?, ?, ?, ?)"""

private val deleteEventsByPersistenceIdBeforeTimestampSql = sql"""
DELETE FROM $journalTable
WHERE persistence_id = ? AND db_timestamp < ?"""

private val deleteEventsBySliceBeforeTimestampSql = sql"""
DELETE FROM $journalTable
WHERE slice = ? AND entity_type = ? AND db_timestamp < ?"""

/**
* All events must be for the same persistenceId.
*
Expand Down Expand Up @@ -363,4 +371,35 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
} yield ()
}

override def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit] = {
r2dbcExecutor
.updateOne(s"delete [$persistenceId]") { connection =>
connection
.createStatement(deleteEventsByPersistenceIdBeforeTimestampSql)
.bind(0, persistenceId)
.bind(1, timestamp)
}
.map(deletedRows =>
log.debugN("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))(
ExecutionContexts.parasitic)
}

override def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] = {
r2dbcExecutor
.updateOne(s"delete [$entityType]") { connection =>
connection
.createStatement(deleteEventsBySliceBeforeTimestampSql)
.bind(0, slice)
.bind(1, entityType)
.bind(2, timestamp)
}
.map(deletedRows =>
log.debugN(
"Deleted [{}] events for entityType [{}], slice [{}], before [{}]",
deletedRows,
entityType,
slice,
timestamp))(ExecutionContexts.parasitic)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.r2dbc.cleanup.scaladsl

import scala.concurrent.duration._

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -16,9 +18,18 @@ import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import org.slf4j.event.Level

import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.stream.scaladsl.Sink

object EventSourcedCleanupSpec {
val config = ConfigFactory
.parseString(s"""
Expand Down Expand Up @@ -333,6 +344,82 @@ class EventSourcedCleanupSpec
stateProbe.expectMessage("4|5|6|7|8|9|10")
}
}

"delete events for one persistenceId before timestamp" in {
val ackProbe = createTestProbe[Done]()
val pid = nextPid()
val p = spawn(Persister(pid))

(1 to 10).foreach { n =>
p ! Persister.PersistWithAck(n, ackProbe.ref)
ackProbe.expectMessage(Done)
ackProbe.expectNoMessage(1.millis) // just to be sure that events have different timestamps
}

testKit.stop(p)

val journalQuery =
PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdTypedQuery](R2dbcReadJournal.Identifier)
val eventsBefore =
journalQuery.currentEventsByPersistenceIdTyped[Any](pid, 1L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsBefore.size shouldBe 10

val cleanup = new EventSourcedCleanup(system)
val timestamp = eventsBefore.last.offset.asInstanceOf[TimestampOffset].timestamp
cleanup.deleteEventsBefore(pid, timestamp).futureValue

val eventsAfter =
journalQuery.currentEventsByPersistenceIdTyped[Any](pid, 1L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsAfter.size shouldBe 1
eventsAfter.head.sequenceNr shouldBe eventsBefore.last.sequenceNr
}

"delete events for slice before timestamp" in {
val ackProbe = createTestProbe[Done]()
val entityType = nextEntityType()
val pid1 = PersistenceId(entityType, "a")
val pid2 = PersistenceId(entityType, "b")
persistenceExt.sliceForPersistenceId(pid1.id) should not be persistenceExt.sliceForPersistenceId(pid2.id)

val p1 = spawn(Persister(pid1))
val p2 = spawn(Persister(pid2))

(1 to 10).foreach { n =>
val p = if (n % 2 == 0) p2 else p1
p ! Persister.PersistWithAck(n, ackProbe.ref)
ackProbe.expectMessage(Done)
ackProbe.expectNoMessage(1.millis) // just to be sure that events have different timestamps
}

testKit.stop(p1)
testKit.stop(p2)

val journalQuery =
PersistenceQuery(system).readJournalFor[CurrentEventsBySliceQuery](R2dbcReadJournal.Identifier)
val eventsBefore =
journalQuery
.currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset)
.runWith(Sink.seq)
.futureValue
eventsBefore.size shouldBe 10
eventsBefore.last.persistenceId shouldBe pid2.id

// we remove all except last for p2, and p1 should remain untouched
val cleanup = new EventSourcedCleanup(system)
val timestamp = eventsBefore.last.offset.asInstanceOf[TimestampOffset].timestamp
val slice = persistenceExt.sliceForPersistenceId(eventsBefore.last.persistenceId)
cleanup.deleteEventsBefore(entityType, slice, timestamp).futureValue

val eventsAfter =
journalQuery
.currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset)
.runWith(Sink.seq)
.futureValue
eventsAfter.count(_.persistenceId == pid1.id) shouldBe 5
eventsAfter.count(_.persistenceId == pid2.id) shouldBe 1
eventsAfter.size shouldBe 5 + 1
eventsAfter.filter(_.persistenceId == pid2.id).last.sequenceNr shouldBe eventsBefore.last.sequenceNr
}
}

}
1 change: 1 addition & 0 deletions docs/src/main/paradox/cleanup.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ not emit further events after that and typically stop itself if it receives more
* Delete all events for one or many persistence ids
* Delete all snapshots for one or many persistence ids
* Delete events before snapshot for one or many persistence ids
* Delete events before a timestamp

The cleanup tool can be combined with the @ref[query plugin](./query.md) which has a query to get all persistence ids.

Expand Down

0 comments on commit bbf5224

Please sign in to comment.