Skip to content

Commit

Permalink
feat: Migration of Durable State, #504
Browse files Browse the repository at this point in the history
* adding Durable State to the MigrationTool
* currently doesn't support retrieval of all persistenceIds
  because the jdbc plugin doesn't implement PersistenceIdsQuery,
  and jdbc plugin is currently the only other know plugin for
  durable state
  • Loading branch information
patriknw committed Jan 22, 2024
1 parent c789bfd commit df8a684
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 12 deletions.
6 changes: 5 additions & 1 deletion core/src/test/scala/akka/persistence/r2dbc/TestActors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ object TestActors {
def apply(pid: String): Behavior[Command] =
apply(PersistenceId.ofUniqueId(pid))

def apply(pid: PersistenceId): Behavior[Command] = {
def apply(pid: PersistenceId): Behavior[Command] =
apply(pid, storePluginId = "")

def apply(pid: PersistenceId, storePluginId: String): Behavior[Command] = {
Behaviors.setup { context =>
DurableStateBehavior[Command, Any](
persistenceId = pid,
Expand Down Expand Up @@ -185,6 +188,7 @@ object TestActors {
Effect.stop()
}
})
.withDurableStateStorePluginId(storePluginId)
}
}
}
Expand Down
25 changes: 20 additions & 5 deletions docs/src/main/paradox/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ Additionally, add the dependency as below.
## Progress table

To speed up processing of subsequent runs it stores migrated persistence ids and sequence
numbers in the table `migration_progress`. In a subsequent run it will only migrate new events and snapshots
compared to what was stored in `migration_progress`. It will also find and migrate new persistence ids in a
subsequent run. You can delete from `migration_progress` if you want to re-run the full migration.
numbers in the table `migration_progress`. In a subsequent run it will only migrate new events, snapshots
and durable states compared to what was stored in `migration_progress`. It will also find and migrate
new persistence ids in a subsequent run. You can delete from `migration_progress` if you want to
re-run the full migration.

It's recommended that you create the `migration_progress` table before running the migration tool, but
if it doesn't exist the tool will try to create the table.
Expand All @@ -42,13 +43,22 @@ CREATE TABLE IF NOT EXISTS migration_progress(
persistence_id VARCHAR(255) NOT NULL,
event_seq_nr BIGINT,
snapshot_seq_nr BIGINT,
state_revision BIGINT,
PRIMARY KEY(persistence_id)
```

## Configuration
## Running

The migration tool can be run as main class `akka.persistence.r2dbc.migration.MigrationTool` provided by the above
`akka-persistence-r2dbc-migration` dependency.
`akka-persistence-r2dbc-migration` dependency. The main method will run `MigrationTool.migrateAll`.

@@@ note

Durable State is not migrated by `MigrationTool.migrateAll`, instead you need to use `MigrationTool.migrateDurableStates` for a given list of persistence ids.

@@@ note

## Configuration

You need to provide configuration for the source persistence plugin and the target Rd2BC plugin in your `application.conf`. An example of such configuration for migration from Akka Persistence JDBC:

Expand All @@ -60,6 +70,11 @@ Application specific serializers for events and snapshots must also be configure

@@@

When running the migration tool for Durable State the single writer assertion must be disabled with configuration:
```hcon
akka.persistence.r2dbc.state.assert-single-writer = off
```

### Reference configuration

The following can be overridden in your `application.conf` for the migration tool specific settings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike

import akka.persistence.r2dbc.TestActors.DurableStatePersister

object MigrationToolSpec {
val config: Config = ConfigFactory
.parseString("""
Expand Down Expand Up @@ -53,6 +55,12 @@ object MigrationToolSpec {
use-shared-db = "default"
tables.event_journal.tableName = "jdbc_event_journal"
}
jdbc-durable-state-store {
use-shared-db = "default"
tables.durable_state.tableName = "jdbc_durable_state"
}
akka.persistence.r2dbc.state.assert-single-writer = off
""")
.withFallback(TestConfig.config)
}
Expand All @@ -69,6 +77,7 @@ class MigrationToolSpec
private val migrationConfig = system.settings.config.getConfig("akka.persistence.r2dbc.migration")
private val sourceJournalPluginId = "jdbc-journal"
private val sourceSnapshotPluginId = migrationConfig.getString("source.snapshot-plugin-id")
private val sourceDurableStatePluginId = migrationConfig.getString("source.durable-state-plugin-id")

private val targetPluginId = migrationConfig.getString("target.persistence-plugin-id")

Expand Down Expand Up @@ -128,12 +137,31 @@ class MigrationToolSpec
},
10.seconds)

Await.result(
r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection =>
connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_durable_state (
| global_offset BIGSERIAL,
| persistence_id VARCHAR(255) NOT NULL,
| revision BIGINT NOT NULL,
| state_payload BYTEA NOT NULL,
| state_serial_id INTEGER NOT NULL,
| state_serial_manifest VARCHAR(255),
| tag VARCHAR,
| state_timestamp BIGINT NOT NULL,
| PRIMARY KEY(persistence_id)
|);""".stripMargin)
},
10.seconds)

Await.result(
r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_event_journal")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_snapshot")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_durable_state")),
10.seconds)

Await.result(migration.migrationDao.createProgressTable(), 10.seconds)
Await.result(
Expand All @@ -152,6 +180,14 @@ class MigrationToolSpec
probe.expectMessage(Done)
}

private def persistDurableState(pid: PersistenceId, state: Any): Unit = {
val probe = testKit.createTestProbe[Done]()
val persister = testKit.spawn(DurableStatePersister(pid, sourceDurableStatePluginId))
persister ! DurableStatePersister.Persist(state)
persister ! DurableStatePersister.Stop(probe.ref)
probe.expectMessage(Done)
}

private def assertEvents(pid: PersistenceId, expectedEvents: Seq[String]): Unit =
assertState(pid, expectedEvents.mkString("|"))

Expand All @@ -165,6 +201,16 @@ class MigrationToolSpec
probe.expectMessage(Done)
}

private def assertDurableState(pid: PersistenceId, expectedState: String): Unit = {
val probe = testKit.createTestProbe[Any]()
val targetPersister =
testKit.spawn(DurableStatePersister(pid, targetPluginId + ".state"))
targetPersister ! DurableStatePersister.GetState(probe.ref)
probe.expectMessage(expectedState)
targetPersister ! DurableStatePersister.Stop(probe.ref)
probe.expectMessage(Done)
}

"MigrationTool" should {
if (!testEnabled) {
info(
Expand Down Expand Up @@ -275,5 +321,58 @@ class MigrationToolSpec
}
}

"migrate durable state of one persistenceId" in {
val pid = PersistenceId.ofUniqueId(nextPid())
persistDurableState(pid, "s-1")
migration.migrateDurableState(pid.id).futureValue shouldBe 1L
assertDurableState(pid, "s-1")
}

"migrate durable state of a persistenceId several times" in {
val pid = PersistenceId.ofUniqueId(nextPid())
persistDurableState(pid, "s-1")
migration.migrateDurableState(pid.id).futureValue shouldBe 1L
assertDurableState(pid, "s-1")

// running again should be idempotent and not fail
migration.migrateDurableState(pid.id).futureValue shouldBe 0L
assertDurableState(pid, "s-1")

// and running again should find updated revision
persistDurableState(pid, "s-2")
migration.migrateDurableState(pid.id).futureValue shouldBe 1L
assertDurableState(pid, "s-2")
}

"update durable state migration_progress" in {
val pid = PersistenceId.ofUniqueId(nextPid())
migration.migrationDao.currentProgress(pid.id).futureValue.map(_.durableStateRevision) shouldBe None

persistDurableState(pid, "s-1")
migration.migrateDurableState(pid.id).futureValue shouldBe 1L
migration.migrationDao.currentProgress(pid.id).futureValue.map(_.durableStateRevision) shouldBe Some(1L)

// store and migration some more
persistDurableState(pid, "s-2")
persistDurableState(pid, "s-3")
migration.migrateDurableState(pid.id).futureValue shouldBe 1L
migration.migrationDao.currentProgress(pid.id).futureValue.map(_.durableStateRevision) shouldBe Some(3L)
}

"migrate all durable state persistenceIds" in {
val numberOfPids = 10
val pids = (1 to numberOfPids).map(_ => PersistenceId.ofUniqueId(nextPid()))

pids.foreach { pid =>
persistDurableState(pid, s"s-$pid")
}

migration.migrateDurableStates(pids.map(_.id)).futureValue

pids.foreach { pid =>
assertDurableState(pid, s"s-$pid")
}
}

}
}
1 change: 1 addition & 0 deletions migration/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ akka.persistence.r2dbc.migration {
source {
query-plugin-id = "jdbc-read-journal"
snapshot-plugin-id = "jdbc-snapshot-store"
durable-state-plugin-id = "jdbc-durable-state-store"
}

# R2DBC Akka Persistence plugin to migrate to.
Expand Down
Loading

0 comments on commit df8a684

Please sign in to comment.