Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Cleanup events by time #495

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.r2dbc.cleanup.javadsl

import java.time.Instant
import java.util.concurrent.CompletionStage
import java.util.{ List => JList }

Expand Down Expand Up @@ -64,6 +65,44 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup)
def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] =
delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).toJava

/**
* Delete events before a timestamp for the given persistence id. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param persistenceId
* the persistence id to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(persistenceId: String, timestamp: Instant): CompletionStage[Done] =
delegate.deleteEventsBefore(persistenceId, timestamp).toJava

/**
* Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param entityType
* the entity type to delete for
* @param slice
* the slice to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): CompletionStage[Done] =
delegate.deleteEventsBefore(entityType, slice, timestamp).toJava

/**
* Delete snapshots related to one single `persistenceId`. Events are not deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.r2dbc.cleanup.scaladsl

import java.time.Instant

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Failure
Expand Down Expand Up @@ -92,6 +94,48 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf
foreach(persistenceIds, "deleteAllEvents", pid => deleteAllEvents(pid, resetSequenceNumber))
}

/**
* Delete events before a timestamp for the given persistence id. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param persistenceId
* the persistence id to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Done] = {
log.debug("deleteEventsBefore persistenceId [{}], timestamp [{}]", persistenceId, timestamp)
journalDao.deleteEventsBefore(persistenceId, timestamp).map(_ => Done)
}

/**
* Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted.
*
* This can be useful for `DurableStateBehavior` with change events, where the events are only used for the
* Projections and not for the recovery of the `DurableStateBehavior` state. The timestamp may correspond to the the
* offset timestamp of the Projections, if events are not needed after all Projections have processed them.
*
* Be aware of that if all events of a persistenceId are removed the sequence number will start from 1 again if an
* `EventSourcedBehavior` with the same persistenceId is used again.
*
* @param entityType
* the entity type to delete for
* @param slice
* the slice to delete for
* @param timestamp
* timestamp (exclusive) to delete up to
*/
def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Done] = {
log.debug("deleteEventsBefore [{}], slice [{}] timestamp [{}]", entityType, slice, timestamp)
journalDao.deleteEventsBefore(entityType, slice, timestamp).map(_ => Done)
}

/**
* Delete snapshots related to one single `persistenceId`. Events are not deleted.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,8 @@ private[r2dbc] trait JournalDao {

def deleteEventsTo(persistenceId: String, toSequenceNr: Long, resetSequenceNumber: Boolean): Future[Unit]

def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit]

def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit]

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload, deleted)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?, ?, ?, ?)"""

private val deleteEventsByPersistenceIdBeforeTimestampSql = sql"""
DELETE FROM $journalTable
WHERE persistence_id = ? AND db_timestamp < ?"""

private val deleteEventsBySliceBeforeTimestampSql = sql"""
DELETE FROM $journalTable
WHERE slice = ? AND entity_type = ? AND db_timestamp < ?"""

/**
* All events must be for the same persistenceId.
*
Expand Down Expand Up @@ -339,4 +347,35 @@ private[r2dbc] class PostgresJournalDao(journalSettings: R2dbcSettings, connecti
} yield ()
}

override def deleteEventsBefore(persistenceId: String, timestamp: Instant): Future[Unit] = {
r2dbcExecutor
.updateOne(s"delete [$persistenceId]") { connection =>
connection
.createStatement(deleteEventsByPersistenceIdBeforeTimestampSql)
.bind(0, persistenceId)
.bind(1, timestamp)
}
.map(deletedRows =>
log.debugN("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))(
ExecutionContexts.parasitic)
}

override def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] = {
r2dbcExecutor
.updateOne(s"delete [$entityType]") { connection =>
connection
.createStatement(deleteEventsBySliceBeforeTimestampSql)
.bind(0, slice)
.bind(1, entityType)
.bind(2, timestamp)
}
.map(deletedRows =>
log.debugN(
"Deleted [{}] events for entityType [{}], slice [{}], before [{}]",
deletedRows,
entityType,
slice,
timestamp))(ExecutionContexts.parasitic)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.r2dbc.cleanup.scaladsl

import scala.concurrent.duration._

import akka.Done
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -16,9 +18,18 @@ import akka.persistence.r2dbc.TestDbLifecycle
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import org.slf4j.event.Level

import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.TimestampOffset
import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery
import akka.persistence.query.typed.scaladsl.CurrentEventsBySliceQuery
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.stream.scaladsl.Sink

object EventSourcedCleanupSpec {
val config = ConfigFactory
.parseString(s"""
Expand Down Expand Up @@ -333,6 +344,82 @@ class EventSourcedCleanupSpec
stateProbe.expectMessage("4|5|6|7|8|9|10")
}
}

"delete events for one persistenceId before timestamp" in {
val ackProbe = createTestProbe[Done]()
val pid = nextPid()
val p = spawn(Persister(pid))

(1 to 10).foreach { n =>
p ! Persister.PersistWithAck(n, ackProbe.ref)
ackProbe.expectMessage(Done)
ackProbe.expectNoMessage(1.millis) // just to be sure that events have different timestamps
}

testKit.stop(p)

val journalQuery =
PersistenceQuery(system).readJournalFor[CurrentEventsByPersistenceIdTypedQuery](R2dbcReadJournal.Identifier)
val eventsBefore =
journalQuery.currentEventsByPersistenceIdTyped[Any](pid, 1L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsBefore.size shouldBe 10

val cleanup = new EventSourcedCleanup(system)
val timestamp = eventsBefore.last.offset.asInstanceOf[TimestampOffset].timestamp
cleanup.deleteEventsBefore(pid, timestamp).futureValue

val eventsAfter =
journalQuery.currentEventsByPersistenceIdTyped[Any](pid, 1L, Long.MaxValue).runWith(Sink.seq).futureValue
eventsAfter.size shouldBe 1
eventsAfter.head.sequenceNr shouldBe eventsBefore.last.sequenceNr
}

"delete events for slice before timestamp" in {
val ackProbe = createTestProbe[Done]()
val entityType = nextEntityType()
val pid1 = PersistenceId(entityType, "a")
val pid2 = PersistenceId(entityType, "b")
persistenceExt.sliceForPersistenceId(pid1.id) should not be persistenceExt.sliceForPersistenceId(pid2.id)

val p1 = spawn(Persister(pid1))
val p2 = spawn(Persister(pid2))

(1 to 10).foreach { n =>
val p = if (n % 2 == 0) p2 else p1
p ! Persister.PersistWithAck(n, ackProbe.ref)
ackProbe.expectMessage(Done)
ackProbe.expectNoMessage(1.millis) // just to be sure that events have different timestamps
}

testKit.stop(p1)
testKit.stop(p2)

val journalQuery =
PersistenceQuery(system).readJournalFor[CurrentEventsBySliceQuery](R2dbcReadJournal.Identifier)
val eventsBefore =
journalQuery
.currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset)
.runWith(Sink.seq)
.futureValue
eventsBefore.size shouldBe 10
eventsBefore.last.persistenceId shouldBe pid2.id

// we remove all except last for p2, and p1 should remain untouched
val cleanup = new EventSourcedCleanup(system)
val timestamp = eventsBefore.last.offset.asInstanceOf[TimestampOffset].timestamp
val slice = persistenceExt.sliceForPersistenceId(eventsBefore.last.persistenceId)
cleanup.deleteEventsBefore(entityType, slice, timestamp).futureValue

val eventsAfter =
journalQuery
.currentEventsBySlices[Any](entityType, 0, persistenceExt.numberOfSlices - 1, Offset.noOffset)
.runWith(Sink.seq)
.futureValue
eventsAfter.count(_.persistenceId == pid1.id) shouldBe 5
eventsAfter.count(_.persistenceId == pid2.id) shouldBe 1
eventsAfter.size shouldBe 5 + 1
eventsAfter.filter(_.persistenceId == pid2.id).last.sequenceNr shouldBe eventsBefore.last.sequenceNr
}
}

}
1 change: 1 addition & 0 deletions docs/src/main/paradox/cleanup.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ not emit further events after that and typically stop itself if it receives more
* Delete all events for one or many persistence ids
* Delete all snapshots for one or many persistence ids
* Delete events before snapshot for one or many persistence ids
* Delete events before a timestamp

The cleanup tool can be combined with the @ref[query plugin](./query.md) which has a query to get all persistence ids.

Expand Down