From ead25b6f2d29dab7e62f0d518564f4cd5acb83ec Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 23 Jan 2024 17:59:57 +0100 Subject: [PATCH] feat: Migration of Durable State, #504 (#506) --- .../akka/persistence/r2dbc/TestActors.scala | 6 +- docs/src/main/paradox/migration.md | 25 +++- .../r2dbc/migration/MigrationToolSpec.scala | 99 +++++++++++++ migration/src/main/resources/reference.conf | 1 + .../r2dbc/migration/MigrationTool.scala | 138 +++++++++++++++++- .../r2dbc/migration/MigrationToolDao.scala | 27 +++- 6 files changed, 284 insertions(+), 12 deletions(-) diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala b/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala index 49dd8b53..57afa503 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala @@ -146,7 +146,10 @@ object TestActors { def apply(pid: String): Behavior[Command] = apply(PersistenceId.ofUniqueId(pid)) - def apply(pid: PersistenceId): Behavior[Command] = { + def apply(pid: PersistenceId): Behavior[Command] = + apply(pid, storePluginId = "") + + def apply(pid: PersistenceId, storePluginId: String): Behavior[Command] = { Behaviors.setup { context => DurableStateBehavior[Command, Any]( persistenceId = pid, @@ -185,6 +188,7 @@ object TestActors { Effect.stop() } }) + .withDurableStateStorePluginId(storePluginId) } } } diff --git a/docs/src/main/paradox/migration.md b/docs/src/main/paradox/migration.md index ab561878..13573e28 100644 --- a/docs/src/main/paradox/migration.md +++ b/docs/src/main/paradox/migration.md @@ -30,9 +30,10 @@ Additionally, add the dependency as below. ## Progress table To speed up processing of subsequent runs it stores migrated persistence ids and sequence -numbers in the table `migration_progress`. In a subsequent run it will only migrate new events and snapshots -compared to what was stored in `migration_progress`. It will also find and migrate new persistence ids in a -subsequent run. You can delete from `migration_progress` if you want to re-run the full migration. +numbers in the table `migration_progress`. In a subsequent run it will only migrate new events, snapshots +and durable states compared to what was stored in `migration_progress`. It will also find and migrate +new persistence ids in a subsequent run. You can delete from `migration_progress` if you want to +re-run the full migration. It's recommended that you create the `migration_progress` table before running the migration tool, but if it doesn't exist the tool will try to create the table. @@ -42,13 +43,22 @@ CREATE TABLE IF NOT EXISTS migration_progress( persistence_id VARCHAR(255) NOT NULL, event_seq_nr BIGINT, snapshot_seq_nr BIGINT, + state_revision BIGINT, PRIMARY KEY(persistence_id) ``` -## Configuration +## Running The migration tool can be run as main class `akka.persistence.r2dbc.migration.MigrationTool` provided by the above -`akka-persistence-r2dbc-migration` dependency. +`akka-persistence-r2dbc-migration` dependency. The main method will run `MigrationTool.migrateAll`. + +@@@ note + +Durable State is not migrated by `MigrationTool.migrateAll`, instead you need to use `MigrationTool.migrateDurableStates` for a given list of persistence ids. + +@@@ note + +## Configuration You need to provide configuration for the source persistence plugin and the target Rd2BC plugin in your `application.conf`. An example of such configuration for migration from Akka Persistence JDBC: @@ -60,6 +70,11 @@ Application specific serializers for events and snapshots must also be configure @@@ +When running the migration tool for Durable State the single writer assertion must be disabled with configuration: +```hcon +akka.persistence.r2dbc.state.assert-single-writer = off +``` + ### Reference configuration The following can be overridden in your `application.conf` for the migration tool specific settings: diff --git a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala index e16422cd..450b3219 100644 --- a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala +++ b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala @@ -20,6 +20,8 @@ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike +import akka.persistence.r2dbc.TestActors.DurableStatePersister + object MigrationToolSpec { val config: Config = ConfigFactory .parseString(""" @@ -53,6 +55,12 @@ object MigrationToolSpec { use-shared-db = "default" tables.event_journal.tableName = "jdbc_event_journal" } + jdbc-durable-state-store { + use-shared-db = "default" + tables.durable_state.tableName = "jdbc_durable_state" + } + + akka.persistence.r2dbc.state.assert-single-writer = off """) .withFallback(TestConfig.config) } @@ -69,6 +77,7 @@ class MigrationToolSpec private val migrationConfig = system.settings.config.getConfig("akka.persistence.r2dbc.migration") private val sourceJournalPluginId = "jdbc-journal" private val sourceSnapshotPluginId = migrationConfig.getString("source.snapshot-plugin-id") + private val sourceDurableStatePluginId = migrationConfig.getString("source.durable-state-plugin-id") private val targetPluginId = migrationConfig.getString("target.persistence-plugin-id") @@ -128,12 +137,31 @@ class MigrationToolSpec }, 10.seconds) + Await.result( + r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection => + connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_durable_state ( + | global_offset BIGSERIAL, + | persistence_id VARCHAR(255) NOT NULL, + | revision BIGINT NOT NULL, + | state_payload BYTEA NOT NULL, + | state_serial_id INTEGER NOT NULL, + | state_serial_manifest VARCHAR(255), + | tag VARCHAR, + | state_timestamp BIGINT NOT NULL, + | PRIMARY KEY(persistence_id) + |);""".stripMargin) + }, + 10.seconds) + Await.result( r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_event_journal")), 10.seconds) Await.result( r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_snapshot")), 10.seconds) + Await.result( + r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_durable_state")), + 10.seconds) Await.result(migration.migrationDao.createProgressTable(), 10.seconds) Await.result( @@ -152,6 +180,14 @@ class MigrationToolSpec probe.expectMessage(Done) } + private def persistDurableState(pid: PersistenceId, state: Any): Unit = { + val probe = testKit.createTestProbe[Done]() + val persister = testKit.spawn(DurableStatePersister(pid, sourceDurableStatePluginId)) + persister ! DurableStatePersister.Persist(state) + persister ! DurableStatePersister.Stop(probe.ref) + probe.expectMessage(Done) + } + private def assertEvents(pid: PersistenceId, expectedEvents: Seq[String]): Unit = assertState(pid, expectedEvents.mkString("|")) @@ -165,6 +201,16 @@ class MigrationToolSpec probe.expectMessage(Done) } + private def assertDurableState(pid: PersistenceId, expectedState: String): Unit = { + val probe = testKit.createTestProbe[Any]() + val targetPersister = + testKit.spawn(DurableStatePersister(pid, targetPluginId + ".state")) + targetPersister ! DurableStatePersister.GetState(probe.ref) + probe.expectMessage(expectedState) + targetPersister ! DurableStatePersister.Stop(probe.ref) + probe.expectMessage(Done) + } + "MigrationTool" should { if (!testEnabled) { info( @@ -275,5 +321,58 @@ class MigrationToolSpec } } + "migrate durable state of one persistenceId" in { + val pid = PersistenceId.ofUniqueId(nextPid()) + persistDurableState(pid, "s-1") + migration.migrateDurableState(pid.id).futureValue shouldBe 1L + assertDurableState(pid, "s-1") + } + + "migrate durable state of a persistenceId several times" in { + val pid = PersistenceId.ofUniqueId(nextPid()) + persistDurableState(pid, "s-1") + migration.migrateDurableState(pid.id).futureValue shouldBe 1L + assertDurableState(pid, "s-1") + + // running again should be idempotent and not fail + migration.migrateDurableState(pid.id).futureValue shouldBe 0L + assertDurableState(pid, "s-1") + + // and running again should find updated revision + persistDurableState(pid, "s-2") + migration.migrateDurableState(pid.id).futureValue shouldBe 1L + assertDurableState(pid, "s-2") + } + + "update durable state migration_progress" in { + val pid = PersistenceId.ofUniqueId(nextPid()) + migration.migrationDao.currentProgress(pid.id).futureValue.map(_.durableStateRevision) shouldBe None + + persistDurableState(pid, "s-1") + migration.migrateDurableState(pid.id).futureValue shouldBe 1L + migration.migrationDao.currentProgress(pid.id).futureValue.map(_.durableStateRevision) shouldBe Some(1L) + + // store and migration some more + persistDurableState(pid, "s-2") + persistDurableState(pid, "s-3") + migration.migrateDurableState(pid.id).futureValue shouldBe 1L + migration.migrationDao.currentProgress(pid.id).futureValue.map(_.durableStateRevision) shouldBe Some(3L) + } + + "migrate all durable state persistenceIds" in { + val numberOfPids = 10 + val pids = (1 to numberOfPids).map(_ => PersistenceId.ofUniqueId(nextPid())) + + pids.foreach { pid => + persistDurableState(pid, s"s-$pid") + } + + migration.migrateDurableStates(pids.map(_.id)).futureValue + + pids.foreach { pid => + assertDurableState(pid, s"s-$pid") + } + } + } } diff --git a/migration/src/main/resources/reference.conf b/migration/src/main/resources/reference.conf index 9e831a77..943e394e 100644 --- a/migration/src/main/resources/reference.conf +++ b/migration/src/main/resources/reference.conf @@ -6,6 +6,7 @@ akka.persistence.r2dbc.migration { source { query-plugin-id = "jdbc-read-journal" snapshot-plugin-id = "jdbc-snapshot-store" + durable-state-plugin-id = "jdbc-durable-state-store" } # R2DBC Akka Persistence plugin to migrate to. diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index e722417c..d3a96344 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -5,12 +5,14 @@ package akka.persistence.r2dbc.migration import java.time.Instant + import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try + import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -46,6 +48,12 @@ import akka.util.Timeout import io.r2dbc.spi.R2dbcDataIntegrityViolationException import org.slf4j.LoggerFactory +import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow +import akka.persistence.state.DurableStateStoreRegistry +import akka.persistence.state.scaladsl.DurableStateStore +import akka.persistence.state.scaladsl.GetObjectResult +import akka.stream.scaladsl.Source + object MigrationTool { def main(args: Array[String]): Unit = { ActorSystem(MigrationTool(), "MigrationTool") @@ -54,9 +62,13 @@ object MigrationTool { object Result { val empty: Result = Result(0, 0, 0) } - final case class Result(persistenceIds: Long, events: Long, snapshots: Long) + object DurableStateResult { + val empty: DurableStateResult = DurableStateResult(0, 0) + } + final case class DurableStateResult(persistenceIds: Long, states: Long) + private def apply(): Behavior[Try[Result]] = { Behaviors.setup { context => val migration = new MigrationTool(context.system) @@ -74,6 +86,8 @@ object MigrationTool { } } + private final case class SelectedDurableState(persistenceId: String, revision: Long, value: Any) + } /** @@ -94,7 +108,8 @@ object MigrationTool { * Note: tags are not migrated. */ class MigrationTool(system: ActorSystem[_]) { - import MigrationTool.Result + import MigrationTool.{ DurableStateResult, Result } + import MigrationTool.SelectedDurableState import system.executionContext private implicit val sys: ActorSystem[_] = system @@ -118,6 +133,9 @@ class MigrationTool(system: ActorSystem[_]) { private val targetSnapshotDao = targetR2dbcSettings.connectionFactorySettings.dialect .createSnapshotDao(targetR2dbcSettings, targetConnectionFactory) + private val targetDurableStateDao = + targetR2dbcSettings.connectionFactorySettings.dialect + .createDurableStateDao(targetR2dbcSettings, targetConnectionFactory) private val targetBatch = migrationConfig.getInt("target.batch") @@ -129,6 +147,10 @@ class MigrationTool(system: ActorSystem[_]) { private val sourceSnapshotPluginId = migrationConfig.getString("source.snapshot-plugin-id") private lazy val sourceSnapshotStore = Persistence(system).snapshotStoreFor(sourceSnapshotPluginId) + private val sourceDurableStatePluginId = migrationConfig.getString("source.durable-state-plugin-id") + private lazy val sourceDurableStateStore = + DurableStateStoreRegistry(system).durableStateStoreFor[DurableStateStore[Any]](sourceDurableStatePluginId) + if (targetR2dbcSettings.dialectName == "h2") { log.error("Migrating to H2 using the migration tool not currently supported") } @@ -142,8 +164,10 @@ class MigrationTool(system: ActorSystem[_]) { migrationDao.createProgressTable() /** - * Migrates events and snapshots for all persistence ids. - * @return + * Migrates events, snapshots for all persistence ids. + * + * Note that Durable State is not migrated by this method, instead you need to use + * [[MigrationTool#migrateDurableStates]] for a given list of persistence ids. */ def migrateAll(): Future[Result] = { log.info("Migration started.") @@ -345,4 +369,110 @@ class MigrationTool(system: ActorSystem[_]) { } + /** + * Migrate Durable State for a list of persistence ids. + */ + def migrateDurableStates(persistenceIds: Seq[String]): Future[DurableStateResult] = { + log.info("Migration started.") + val result = + Source(persistenceIds) + .mapAsyncUnordered(parallelism) { persistenceId => + for { + _ <- createProgressTable + currentProgress <- migrationDao.currentProgress(persistenceId) + stateCount <- migrateDurableState(persistenceId, currentProgress) + } yield persistenceId -> DurableStateResult(1, stateCount) + } + .map { case (pid, result @ DurableStateResult(_, states)) => + log.debugN("Migrated persistenceId [{}] with [{}] durable state.", pid, states) + result + } + .runWith(Sink.fold(DurableStateResult.empty) { case (acc, DurableStateResult(_, states)) => + val result = DurableStateResult(acc.persistenceIds + 1, acc.states + states) + if (result.persistenceIds % 100 == 0) + log.infoN("Migrated [{}] persistenceIds with [{}] durable states.", result.persistenceIds, result.states) + result + }) + + result.transform { + case s @ Success(DurableStateResult(persistenceIds, states)) => + log.infoN( + "Migration successful. Migrated [{}] persistenceIds with [{}] durable states.", + persistenceIds, + states) + s + case f @ Failure(exc) => + log.error("Migration failed.", exc) + f + } + } + + /** + * Migrate Durable State for a single persistence id. + */ + def migrateDurableState(persistenceId: String): Future[Int] = { + for { + _ <- createProgressTable + currentProgress <- migrationDao.currentProgress(persistenceId) + stateCount <- migrateDurableState(persistenceId, currentProgress) + } yield stateCount + } + + private def migrateDurableState(persistenceId: String, currentProgress: Option[CurrentProgress]): Future[Int] = { + val progressRevision = currentProgress.map(_.durableStateRevision).getOrElse(0L) + loadSourceDurableState(persistenceId, progressRevision + 1).flatMap { + case None => Future.successful(0) + case Some(selectedDurableState) => + for { + revision <- { + val serializedRow = serializedDurableStateRow(selectedDurableState) + targetDurableStateDao + .upsertState(serializedRow, selectedDurableState.value, changeEvent = None) + .map(_ => selectedDurableState.revision)(ExecutionContexts.parasitic) + } + _ <- migrationDao.updateDurableStateProgress(persistenceId, revision) + } yield 1 + } + } + + private lazy val checkAssertSingleWriter: Unit = { + if (targetR2dbcSettings.durableStateAssertSingleWriter) { + throw new IllegalArgumentException( + "When running the MigrationTool the " + + "`akka.persistence.r2dbc.state.assert-single-writer` configuration must be set to off.") + } + } + + private def serializedDurableStateRow(selectedDurableState: SelectedDurableState): SerializedStateRow = { + val stateAnyRef = selectedDurableState.value.asInstanceOf[AnyRef] + val serializedState = serialization.serialize(stateAnyRef).get + val stateSerializer = serialization.findSerializerFor(stateAnyRef) + val stateManifest = Serializers.manifestFor(stateSerializer, stateAnyRef) + + // not possible to preserve timestamp, because not included in GetObjectResult + val timestamp = Instant.now() + + val serializedRow = SerializedStateRow( + selectedDurableState.persistenceId, + selectedDurableState.revision, + timestamp, + timestamp, + Some(serializedState), + stateSerializer.identifier, + stateManifest, + tags = Set.empty // not possible to preserve tags, because not included in GetObjectResult + ) + serializedRow + } + + private def loadSourceDurableState(persistenceId: String, minRevision: Long): Future[Option[SelectedDurableState]] = { + sourceDurableStateStore + .getObject(persistenceId) + .map { + case GetObjectResult(Some(value), revision) if revision >= minRevision => + Some(SelectedDurableState(persistenceId, revision, value)) + case _ => None + } + } + } diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala index c74def1f..769ebe5c 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala @@ -22,7 +22,11 @@ import org.slf4j.LoggerFactory @InternalApi private[r2dbc] object MigrationToolDao { private val log = LoggerFactory.getLogger(classOf[MigrationToolDao]) - final case class CurrentProgress(persistenceId: String, eventSeqNr: Long, snapshotSeqNr: Long) + final case class CurrentProgress( + persistenceId: String, + eventSeqNr: Long, + snapshotSeqNr: Long, + durableStateRevision: Long) } /** @@ -44,6 +48,7 @@ import org.slf4j.LoggerFactory persistence_id VARCHAR(255) NOT NULL, event_seq_nr BIGINT, snapshot_seq_nr BIGINT, + state_revision BIGINT, PRIMARY KEY(persistence_id) )""") } @@ -83,6 +88,23 @@ import org.slf4j.LoggerFactory .map(_ => Done)(ExecutionContexts.parasitic) } + def updateDurableStateProgress(persistenceId: String, revision: Long): Future[Done] = { + r2dbcExecutor + .updateOne(s"upsert migration progress [$persistenceId]") { connection => + connection + .createStatement(sql""" + INSERT INTO migration_progress + (persistence_id, state_revision) + VALUES (?, ?) + ON CONFLICT (persistence_id) + DO UPDATE SET + state_revision = excluded.state_revision""") + .bind(0, persistenceId) + .bind(1, revision) + } + .map(_ => Done)(ExecutionContexts.parasitic) + } + def currentProgress(persistenceId: String): Future[Option[CurrentProgress]] = { r2dbcExecutor.selectOne(s"read migration progress [$persistenceId]")( _.createStatement(sql"SELECT * FROM migration_progress WHERE persistence_id = ?") @@ -91,7 +113,8 @@ import org.slf4j.LoggerFactory CurrentProgress( persistenceId, eventSeqNr = zeroIfNull(row.get("event_seq_nr", classOf[java.lang.Long])), - snapshotSeqNr = zeroIfNull(row.get("snapshot_seq_nr", classOf[java.lang.Long])))) + snapshotSeqNr = zeroIfNull(row.get("snapshot_seq_nr", classOf[java.lang.Long])), + durableStateRevision = zeroIfNull(row.get("state_revision", classOf[java.lang.Long])))) } private def zeroIfNull(n: java.lang.Long): Long =