diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala index b8da7c43..bee4927a 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala @@ -4,6 +4,7 @@ package akka.persistence.r2dbc.cleanup.javadsl +import java.time.Instant import java.util.concurrent.CompletionStage import java.util.{ List => JList } @@ -64,6 +65,44 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] = delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).toJava + /** + * Delete events before a timestamp for the given persistence id. Snapshots are not deleted. + * + * This can be useful for `DurableStateBehavior` with change events, where the events are only used for the + * Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the + * offset timestamp of the Projections, if events are not needed after all Projections have processed them. + * + * Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an + * `EventSourcedBehavior` with the same persistenceId is used again. + * + * @param persistenceId + * the persistence id to delete for + * @param timestamp + * timestamp (exclusive) to delete up to + */ + def deleteEventsBefore(persistenceId: String, timestamp: Instant): CompletionStage[Done] = + delegate.deleteEventsBefore(persistenceId, timestamp).toJava + + /** + * Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted. + * + * This can be useful for `DurableStateBehavior` with change events, where the events are only used for the + * Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the + * offset timestamp of the Projections, if events are not needed after all Projections have processed them. + * + * Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an + * `EventSourcedBehavior` with the same persistenceId is used again. + * + * @param entityType + * the entity type to delete for + * @param slice + * the slice to delete for + * @param timestamp + * timestamp (exclusive) to delete up to + */ + def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): CompletionStage[Done] = + delegate.deleteEventsBefore(entityType, slice, timestamp).toJava + /** * Delete snapshots related to one single `persistenceId`. Events are not deleted. */ diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala index c77dcbc5..77cf4078 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc.cleanup.scaladsl +import java.time.Instant + import scala.collection.immutable import scala.concurrent.Future import scala.util.Failure @@ -92,6 +94,48 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf foreach(persistenceIds, "deleteAllEvents", pid => deleteAllEvents(pid, resetSequenceNumber)) } + /** + * Delete events before a timestamp for the given persistence id. Snapshots are not deleted. + * + * This can be useful for `DurableStateBehavior` with change events, where the events are only used for the + * Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the + * offset timestamp of the Projections, if events are not needed after all Projections have processed them. + * + * Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an + * `EventSourcedBehavior` with the same persistenceId is used again. + * + * @param persistenceId + * the persistence id to delete for + * @param timestamp + * timestamp (exclusive) to delete up to + */ + def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Done] = { + log.debug("deleteEventsBefore persistenceId [{}], timestamp [{}]", persistenceId, timestamp) + journalDao.deleteEventsBefore(persistenceId, timestamp).map(_ => Done) + } + + /** + * Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted. + * + * This can be useful for `DurableStateBehavior` with change events, where the events are only used for the + * Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the + * offset timestamp of the Projections, if events are not needed after all Projections have processed them. + * + * Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an + * `EventSourcedBehavior` with the same persistenceId is used again. + * + * @param entityType + * the entity type to delete for + * @param slice + * the slice to delete for + * @param timestamp + * timestamp (exclusive) to delete up to + */ + def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Done] = { + log.debug("deleteEventsBefore [{}], slice [{}] timestamp [{}]", entityType, slice, timestamp) + journalDao.deleteEventsBefore(entityType, slice, timestamp).map(_ => Done) + } + /** * Delete snapshots related to one single `persistenceId`. Events are not deleted. */ diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala index ac64692f..6b6d96fb 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/JournalDao.scala @@ -68,4 +68,8 @@ private[r2dbc] trait JournalDao { def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit] + def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit] + + def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] + } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 6358c5bf..afed3b93 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -122,6 +122,14 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?, ?, ?, ?)""" + private val deleteEventsByPersistenceIdBeforeTimestampSql = sql""" + DELETE FROM $journalTable + WHERE persistence_id = ? AND db_timestamp < ?""" + + private val deleteEventsBySliceBeforeTimestampSql = sql""" + DELETE FROM $journalTable + WHERE slice = ? AND entity_type = ? AND db_timestamp < ?""" + /** * All events must be for the same persistenceId. * @@ -363,4 +371,35 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti } yield () } + override def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit] = { + r2dbcExecutor + .updateOne(s"delete [$persistenceId]") { connection => + connection + .createStatement(deleteEventsByPersistenceIdBeforeTimestampSql) + .bind(0, persistenceId) + .bind(1, timestamp) + } + .map(deletedRows => + log.debugN("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))( + ExecutionContexts.parasitic) + } + + override def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] = { + r2dbcExecutor + .updateOne(s"delete [$entityType]") { connection => + connection + .createStatement(deleteEventsBySliceBeforeTimestampSql) + .bind(0, slice) + .bind(1, entityType) + .bind(2, timestamp) + } + .map(deletedRows => + log.debugN( + "Deleted [{}] events for entityType [{}], slice [{}], before [{}]", + deletedRows, + entityType, + slice, + timestamp))(ExecutionContexts.parasitic) + } + } diff --git a/core/src/test/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala index 9275f865..c5e60b2c 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanupSpec.scala @@ -4,6 +4,8 @@ package akka.persistence.r2dbc.cleanup.scaladsl +import scala.concurrent.duration._ + import akka.Done import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -16,9 +18,18 @@ import akka.persistence.r2dbc.TestDbLifecycle import akka.persistence.typed.PersistenceId import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike + import akka.actor.testkit.typed.scaladsl.LoggingTestKit import org.slf4j.event.Level +import akka.persistence.query.Offset +import akka.persistence.query.PersistenceQuery +import akka.persistence.query.TimestampOffset +import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery +import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery +import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal +import akka.stream.scaladsl.Sink + object EventSourcedCleanupSpec { val config = ConfigFactory .parseString(s""" @@ -333,6 +344,82 @@ class EventSourcedCleanupSpec stateProbe.expectMessage("4|5|6|7|8|9|10") } } + + "delete events for one persistenceId before timestamp" in { + val ackProbe = createTestProbe[Done]() + val pid = nextPid() + val p = spawn(Persister(pid)) + + (1 to 10).foreach { n => + p ! Persister.PersistWithAck(n, ackProbe.ref) + ackProbe.expectMessage(Done) + ackProbe.expectNoMessage(1.millis) // just to be sure that events have different timestamps + } + + testKit.stop(p) + + val journalQuery = + PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdTypedQuery](R2dbcReadJournal.Identifier) + val eventsBefore = + journalQuery.currentEventsByPersistenceIdTyped[Any](pid, 1L, Long.MaxValue).runWith(Sink.seq).futureValue + eventsBefore.size shouldBe 10 + + val cleanup = new EventSourcedCleanup(system) + val timestamp = eventsBefore.last.offset.asInstanceOf[TimestampOffset].timestamp + cleanup.deleteEventsBefore(pid, timestamp).futureValue + + val eventsAfter = + journalQuery.currentEventsByPersistenceIdTyped[Any](pid, 1L, Long.MaxValue).runWith(Sink.seq).futureValue + eventsAfter.size shouldBe 1 + eventsAfter.head.sequenceNr shouldBe eventsBefore.last.sequenceNr + } + + "delete events for slice before timestamp" in { + val ackProbe = createTestProbe[Done]() + val entityType = nextEntityType() + val pid1 = PersistenceId(entityType, "a") + val pid2 = PersistenceId(entityType, "b") + persistenceExt.sliceForPersistenceId(pid1.id) should not be persistenceExt.sliceForPersistenceId(pid2.id) + + val p1 = spawn(Persister(pid1)) + val p2 = spawn(Persister(pid2)) + + (1 to 10).foreach { n => + val p = if (n % 2 == 0) p2 else p1 + p ! Persister.PersistWithAck(n, ackProbe.ref) + ackProbe.expectMessage(Done) + ackProbe.expectNoMessage(1.millis) // just to be sure that events have different timestamps + } + + testKit.stop(p1) + testKit.stop(p2) + + val journalQuery = + PersistenceQuery(system).readJournalFor[CurrentEventsBySliceQuery](R2dbcReadJournal.Identifier) + val eventsBefore = + journalQuery + .currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset) + .runWith(Sink.seq) + .futureValue + eventsBefore.size shouldBe 10 + eventsBefore.last.persistenceId shouldBe pid2.id + + // we remove all except last for p2, and p1 should remain untouched + val cleanup = new EventSourcedCleanup(system) + val timestamp = eventsBefore.last.offset.asInstanceOf[TimestampOffset].timestamp + val slice = persistenceExt.sliceForPersistenceId(eventsBefore.last.persistenceId) + cleanup.deleteEventsBefore(entityType, slice, timestamp).futureValue + + val eventsAfter = + journalQuery + .currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset) + .runWith(Sink.seq) + .futureValue + eventsAfter.count(_.persistenceId == pid1.id) shouldBe 5 + eventsAfter.count(_.persistenceId == pid2.id) shouldBe 1 + eventsAfter.size shouldBe 5 + 1 + eventsAfter.filter(_.persistenceId == pid2.id).last.sequenceNr shouldBe eventsBefore.last.sequenceNr + } } } diff --git a/docs/src/main/paradox/cleanup.md b/docs/src/main/paradox/cleanup.md index fe9dc97e..60c9c85c 100644 --- a/docs/src/main/paradox/cleanup.md +++ b/docs/src/main/paradox/cleanup.md @@ -30,6 +30,7 @@ not emit further events after that and typically stop itself if it receives more * Delete all events for one or many persistence ids * Delete all snapshots for one or many persistence ids * Delete events before snapshot for one or many persistence ids +* Delete events before a timestamp The cleanup tool can be combined with the @ref[query plugin](./query.md) which has a query to get all persistence ids.