diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml
index a7a660ed6..8ff4fea3c 100644
--- a/.github/workflows/checks.yml
+++ b/.github/workflows/checks.yml
@@ -5,6 +5,7 @@ on:
push:
branches:
- master
+ - migration-tool # remove before merging to master
tags-ignore: [ v.* ]
jobs:
diff --git a/.github/workflows/h2-test.yml b/.github/workflows/h2-test.yml
index 5537653e7..e42b4ab67 100644
--- a/.github/workflows/h2-test.yml
+++ b/.github/workflows/h2-test.yml
@@ -5,6 +5,7 @@ on:
push:
branches:
- master
+ - migration-tool # remove before merging to master
tags-ignore: [ v.* ]
jobs:
diff --git a/.github/workflows/mysql-tests.yml b/.github/workflows/mysql-tests.yml
index c539fd936..6ba236b23 100644
--- a/.github/workflows/mysql-tests.yml
+++ b/.github/workflows/mysql-tests.yml
@@ -5,6 +5,7 @@ on:
push:
branches:
- master
+ - migration-tool # remove before merging to master
tags-ignore: [ v.* ]
jobs:
diff --git a/.github/workflows/oracle-tests.yml b/.github/workflows/oracle-tests.yml
index fd58a719c..89f2a4450 100644
--- a/.github/workflows/oracle-tests.yml
+++ b/.github/workflows/oracle-tests.yml
@@ -5,6 +5,7 @@ on:
push:
branches:
- master
+ - migration-tool # remove before merging to master
tags-ignore: [ v.* ]
jobs:
diff --git a/.github/workflows/postgres-tests.yml b/.github/workflows/postgres-tests.yml
index cdf1d39d8..b3e7bde50 100644
--- a/.github/workflows/postgres-tests.yml
+++ b/.github/workflows/postgres-tests.yml
@@ -5,6 +5,7 @@ on:
push:
branches:
- master
+ - migration-tool # remove before merging to master
tags-ignore: [ v.* ]
jobs:
diff --git a/.github/workflows/sqlserver-tests.yml b/.github/workflows/sqlserver-tests.yml
index 67baad746..46681ad1d 100644
--- a/.github/workflows/sqlserver-tests.yml
+++ b/.github/workflows/sqlserver-tests.yml
@@ -5,6 +5,7 @@ on:
push:
branches:
- master
+ - migration-tool # remove before merging to master
tags-ignore: [ v.* ]
jobs:
diff --git a/build.sbt b/build.sbt
index 6ddd2005f..6f10592f1 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,13 +1,13 @@
import com.lightbend.paradox.apidoc.ApidocPlugin.autoImport.apidocRootPackage
// FIXME remove switching to final Akka version
-resolvers in ThisBuild += "Akka Snapshots".at("https://oss.sonatype.org/content/repositories/snapshots/")
+ThisBuild / resolvers += "Akka Snapshots".at("https://oss.sonatype.org/content/repositories/snapshots/")
lazy val `akka-persistence-jdbc` = project
.in(file("."))
.enablePlugins(ScalaUnidocPlugin)
.disablePlugins(MimaPlugin, SitePlugin)
- .aggregate(core, migration, docs)
+ .aggregate(core, docs, migrator)
.settings(publish / skip := true)
lazy val core = project
@@ -24,13 +24,17 @@ lazy val core = project
organization.value %% name.value % previousStableVersion.value.getOrElse(
throw new Error("Unable to determine previous version for MiMa"))))
-lazy val migration = project
- .in(file("migration"))
+lazy val migrator = project
+ .in(file("migrator"))
.disablePlugins(SitePlugin, MimaPlugin)
+ .configs(IntegrationTest.extend(Test))
+ .settings(Defaults.itSettings)
.settings(
- name := "akka-persistence-jdbc-migration",
- libraryDependencies ++= Dependencies.Migration,
+ name := "akka-persistence-jdbc-migrator",
+ libraryDependencies ++= Dependencies.Migration ++ Dependencies.Libraries,
+ // TODO remove this when ready to publish it
publish / skip := true)
+ .dependsOn(core % "compile->compile;test->test")
lazy val docs = project
.enablePlugins(ProjectAutoPlugin, AkkaParadoxPlugin, ParadoxSitePlugin, PreprocessPlugin, PublishRsyncPlugin)
diff --git a/migration/src/main/resources/db/migration/postgres/V001__test.sql b/migration/src/main/resources/db/migration/postgres/V001__test.sql
deleted file mode 100644
index e69de29bb..000000000
diff --git a/migration/src/main/resources/db/migration/postgres/V002__test-2.sql b/migration/src/main/resources/db/migration/postgres/V002__test-2.sql
deleted file mode 100644
index 1661fb77c..000000000
--- a/migration/src/main/resources/db/migration/postgres/V002__test-2.sql
+++ /dev/null
@@ -1,9 +0,0 @@
-CREATE TABLE IF NOT EXISTS public.migrated2 (
- persistence_id VARCHAR(255) NOT NULL,
- sequence_number BIGINT NOT NULL,
- created BIGINT NOT NULL,
- snapshot BYTEA NOT NULL
-);
-CREATE TABLE test_user (
- name VARCHAR(200)
-);
diff --git a/migration/src/main/resources/logback.xml b/migration/src/main/resources/logback.xml
deleted file mode 100644
index 16b67de61..000000000
--- a/migration/src/main/resources/logback.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
-
- debug
-
-
- %date{ISO8601} - %logger -> %-5level[%thread] %logger{0} - %msg%n
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/migration/src/main/resources/reference.conf b/migration/src/main/resources/reference.conf
deleted file mode 100644
index 413c9f220..000000000
--- a/migration/src/main/resources/reference.conf
+++ /dev/null
@@ -1,9 +0,0 @@
-akka-persistence-jdbc {
- migration {
- # supported values: postgres
- database-vendor = ""
- url = ""
- user = ""
- password = ""
- }
-}
diff --git a/migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala b/migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala
deleted file mode 100644
index 55e843ca3..000000000
--- a/migration/src/main/scala/akka/persistence/jdbc/migration/Main.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2014 - 2019 Dennis Vriend
- * Copyright (C) 2019 - 2021 Lightbend Inc.
- */
-
-package akka.persistence.jdbc.migration
-
-import com.typesafe.config.{ Config, ConfigFactory }
-import org.flywaydb.core.Flyway
-import org.flywaydb.core.api.Location
-
-object Main extends App {
-
- val config = ConfigFactory.load().getConfig("akka-persistence-jdbc.migration")
-
- def run(config: Config): Unit = {
- val vendor = config.getString("database-vendor")
- val url = config.getString("url")
- val user = config.getString("user")
- val password = config.getString("password")
-
- val flywayConfig = Flyway.configure.dataSource(url, user, password).table("apjdbc_schema_history")
-
- vendor match {
- case "postgres" =>
- flywayConfig.locations(new Location("classpath:db/migration/postgres"))
- case other =>
- sys.error(s"Akka Persistence JDBC migrations do not support `$other` (supported are `postgres`)")
- }
-
- val flyway = flywayConfig.load
- flyway.baseline()
- flyway.migrate()
-
- }
-}
diff --git a/migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala b/migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala
deleted file mode 100644
index 409fd03a9..000000000
--- a/migration/src/main/scala/db/migration/postgres/V003__UpdateUsers.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (C) 2014 - 2019 Dennis Vriend
- * Copyright (C) 2019 - 2021 Lightbend Inc.
- */
-
-package db.migration.postgres
-
-import org.flywaydb.core.api.migration.{ BaseJavaMigration, Context }
-
-class V003__UpdateUsers extends BaseJavaMigration {
-
- @throws[Exception]
- override def migrate(context: Context): Unit = {
- try {
- val statement = context.getConnection.prepareStatement("INSERT INTO test_user (name) VALUES ('Obelix')")
- try statement.execute
- finally if (statement != null) statement.close()
- }
- }
-
-}
diff --git a/migration/src/test/resources/postgres/init.sql b/migration/src/test/resources/postgres/init.sql
deleted file mode 100644
index 232eba2f3..000000000
--- a/migration/src/test/resources/postgres/init.sql
+++ /dev/null
@@ -1,25 +0,0 @@
-DROP TABLE IF EXISTS public.apjdbc_schema_history;
-
-DROP TABLE IF EXISTS public.journal;
-
-CREATE TABLE IF NOT EXISTS public.journal (
- ordering BIGSERIAL,
- persistence_id VARCHAR(255) NOT NULL,
- sequence_number BIGINT NOT NULL,
- deleted BOOLEAN DEFAULT FALSE NOT NULL,
- tags VARCHAR(255) DEFAULT NULL,
- message BYTEA NOT NULL,
- PRIMARY KEY(persistence_id, sequence_number)
-);
-
-CREATE UNIQUE INDEX journal_ordering_idx ON public.journal(ordering);
-
-DROP TABLE IF EXISTS public.snapshot;
-
-CREATE TABLE IF NOT EXISTS public.snapshot (
- persistence_id VARCHAR(255) NOT NULL,
- sequence_number BIGINT NOT NULL,
- created BIGINT NOT NULL,
- snapshot BYTEA NOT NULL,
- PRIMARY KEY(persistence_id, sequence_number)
-);
diff --git a/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala b/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala
deleted file mode 100644
index d2a0be04a..000000000
--- a/migration/src/test/scala/akka/persistence/jdbc/migration/PostgresSpec.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (C) 2014 - 2019 Dennis Vriend
- * Copyright (C) 2019 - 2021 Lightbend Inc.
- */
-
-package akka.persistence.jdbc.migration
-
-import java.sql.{ Connection, DriverManager }
-import java.util.Properties
-
-import com.typesafe.config.{ Config, ConfigFactory }
-import org.scalatest.BeforeAndAfterAll
-import org.testcontainers.containers.PostgreSQLContainer
-import org.scalatest.flatspec.AnyFlatSpec
-import org.scalatest.matchers.should.Matchers
-
-class PostgresSpec extends AnyFlatSpec with Matchers with BeforeAndAfterAll {
-
- val postgres: PostgreSQLContainer[_] = {
- val c = new PostgreSQLContainer("postgres:13.1")
- c.withDatabaseName("public")
- c.withInitScript("postgres/init.sql")
- c
- }
- var migrationConfig: Config = null
- val connectionProperties = new Properties()
-
- override def beforeAll(): Unit = {
- postgres.start()
- migrationConfig = ConfigFactory
- .parseString(s"""migration {
- |database-vendor = postgres
- |url = "${postgres.getJdbcUrl}"
- |user = "${postgres.getUsername}"
- |password = "${postgres.getPassword}"
- |}""".stripMargin)
- .getConfig("migration")
-
- connectionProperties.put("user", postgres.getUsername);
- connectionProperties.put("password", postgres.getPassword);
- }
-
- override def afterAll(): Unit = {
- postgres.stop()
- }
-
- "Migration 002" should "be applied" in {
- Main.run(migrationConfig)
- val connection = DriverManager.getConnection(postgres.getJdbcUrl, connectionProperties);
- println(existingTables(connection))
- val stmt = connection.createStatement()
- stmt.executeQuery("SELECT * FROM migrated2;")
- }
-
- "Scala migration 003" should "be applied" in {
- Main.run(migrationConfig)
- val connection = DriverManager.getConnection(postgres.getJdbcUrl, connectionProperties);
- println(existingTables(connection))
- val stmt = connection.createStatement()
- val rs = stmt.executeQuery("SELECT * FROM test_user;")
- val sb = new StringBuilder()
- while (rs.next()) {
- sb.append(rs.getString(1)).append("\n")
- }
- sb.toString() shouldBe "Obelix\n"
- }
-
- private def existingTables(connection: Connection) = {
- val stmt = connection.createStatement()
- val rs = stmt.executeQuery(
- "SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema');")
- val sb = new StringBuilder("Existing tables:\n")
- while (rs.next()) {
- sb.append(" " + rs.getString(1) + "." + rs.getString(2) + "\n")
- }
- sb.toString()
- }
-}
diff --git a/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala
new file mode 100644
index 000000000..1fdd34da9
--- /dev/null
+++ b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/JournalMigratorTest.scala
@@ -0,0 +1,12 @@
+package akka.persistence.jdbc.migrator.integration
+
+import akka.persistence.jdbc.migrator.MigratorSpec._
+import akka.persistence.jdbc.migrator.JournalMigratorTest
+
+class PostgresJournalMigratorTest extends JournalMigratorTest("postgres-application.conf") with PostgresCleaner
+
+class MySQLJournalMigratorTest extends JournalMigratorTest("mysql-application.conf") with MysqlCleaner
+
+class OracleJournalMigratorTest extends JournalMigratorTest("oracle-application.conf") with OracleCleaner
+
+class SqlServerJournalMigratorTest extends JournalMigratorTest("sqlserver-application.conf") with SqlServerCleaner
diff --git a/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala
new file mode 100644
index 000000000..434f093c0
--- /dev/null
+++ b/migrator/src/it/scala/akka/persistence/jdbc/migrator/integration/SnapshotMigratorTest.scala
@@ -0,0 +1,12 @@
+package akka.persistence.jdbc.migrator.integration
+
+import akka.persistence.jdbc.migrator.MigratorSpec._
+import akka.persistence.jdbc.migrator.SnapshotMigratorTest
+
+class PostgresSnapshotMigratorTest extends SnapshotMigratorTest("postgres-application.conf") with PostgresCleaner
+
+class MySQLSnapshotMigratorTest extends SnapshotMigratorTest("mysql-application.conf") with MysqlCleaner
+
+class OracleSnapshotMigratorTest extends SnapshotMigratorTest("oracle-application.conf") with OracleCleaner
+
+class SqlServerSnapshotMigratorTest extends SnapshotMigratorTest("sqlserver-application.conf") with SqlServerCleaner
diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala
new file mode 100644
index 000000000..ed4fa8b9e
--- /dev/null
+++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala
@@ -0,0 +1,151 @@
+/*
+ * Copyright (C) 2014 - 2019 Dennis Vriend
+ * Copyright (C) 2019 - 2021 Lightbend Inc.
+ */
+
+package akka.persistence.jdbc.migrator
+
+import akka.Done
+import akka.actor.ActorSystem
+import akka.persistence.PersistentRepr
+import akka.persistence.jdbc.AkkaSerialization
+import akka.persistence.jdbc.config.{ JournalConfig, ReadJournalConfig }
+import akka.persistence.jdbc.db.SlickExtension
+import akka.persistence.jdbc.journal.dao.JournalQueries
+import akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalSerializer
+import akka.persistence.jdbc.journal.dao.JournalTables.{ JournalAkkaSerializationRow, TagRow }
+import akka.persistence.jdbc.migrator.JournalMigrator.{ JournalConfig, ReadJournalConfig }
+import akka.persistence.jdbc.query.dao.legacy.ReadJournalQueries
+import akka.serialization.{ Serialization, SerializationExtension }
+import akka.stream.scaladsl.Source
+import org.slf4j.{ Logger, LoggerFactory }
+import slick.jdbc._
+
+import scala.concurrent.{ ExecutionContextExecutor, Future }
+import scala.util.{ Failure, Success }
+
+/**
+ * This will help migrate the legacy journal data onto the new journal schema with the
+ * appropriate serialization
+ *
+ * @param system the actor system
+ */
+final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSystem) {
+ implicit val ec: ExecutionContextExecutor = system.dispatcher
+
+ import profile.api._
+
+ val log: Logger = LoggerFactory.getLogger(getClass)
+
+ // get the various configurations
+ private val journalConfig: JournalConfig = new JournalConfig(system.settings.config.getConfig(JournalConfig))
+ private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig(
+ system.settings.config.getConfig(ReadJournalConfig))
+
+ // the journal database
+ private val journalDB: JdbcBackend.Database =
+ SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database
+
+ // get an instance of the new journal queries
+ private val newJournalQueries: JournalQueries =
+ new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration)
+
+ // let us get the journal reader
+ private val serialization: Serialization = SerializationExtension(system)
+ private val legacyJournalQueries: ReadJournalQueries = new ReadJournalQueries(profile, readJournalConfig)
+ private val serializer: ByteArrayJournalSerializer =
+ new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator)
+
+ private val bufferSize: Int = journalConfig.daoConfig.bufferSize
+
+ private val query =
+ legacyJournalQueries.JournalTable.result
+ .withStatementParameters(
+ rsType = ResultSetType.ForwardOnly,
+ rsConcurrency = ResultSetConcurrency.ReadOnly,
+ fetchSize = bufferSize)
+ .transactionally
+
+ /**
+ * write all legacy events into the new journal tables applying the proper serialization
+ */
+ def migrate(): Future[Done] = Source
+ .fromPublisher(journalDB.stream(query))
+ .via(serializer.deserializeFlow)
+ .map {
+ case Success((repr, tags, ordering)) => (repr, tags, ordering)
+ case Failure(exception) => throw exception // blow-up on failure
+ }
+ .map { case (repr, tags, ordering) => serialize(repr, tags, ordering) }
+ // get pages of many records at once
+ .grouped(bufferSize)
+ .mapAsync(1)(records => {
+ val stmt: DBIO[Unit] = records
+ // get all the sql statements for this record as an option
+ .map { case (newRepr, newTags) =>
+ log.debug(s"migrating event for PersistenceID: ${newRepr.persistenceId} with tags ${newTags.mkString(",")}")
+ writeJournalRowsStatements(newRepr, newTags)
+ }
+ // reduce to 1 statement
+ .foldLeft[DBIO[Unit]](DBIO.successful[Unit] {})((priorStmt, nextStmt) => {
+ priorStmt.andThen(nextStmt)
+ })
+
+ journalDB.run(stmt)
+ })
+ .run()
+
+ /**
+ * serialize the PersistentRepr and construct a JournalAkkaSerializationRow and set of matching tags
+ *
+ * @param repr the PersistentRepr
+ * @param tags the tags
+ * @param ordering the ordering of the PersistentRepr
+ * @return the tuple of JournalAkkaSerializationRow and set of tags
+ */
+ private def serialize(
+ repr: PersistentRepr,
+ tags: Set[String],
+ ordering: Long): (JournalAkkaSerializationRow, Set[String]) = {
+
+ val serializedPayload: AkkaSerialization.AkkaSerialized =
+ AkkaSerialization.serialize(serialization, repr.payload).get
+
+ val serializedMetadata: Option[AkkaSerialization.AkkaSerialized] =
+ repr.metadata.flatMap(m => AkkaSerialization.serialize(serialization, m).toOption)
+ val row: JournalAkkaSerializationRow = JournalAkkaSerializationRow(
+ ordering,
+ repr.deleted,
+ repr.persistenceId,
+ repr.sequenceNr,
+ repr.writerUuid,
+ repr.timestamp,
+ repr.manifest,
+ serializedPayload.payload,
+ serializedPayload.serId,
+ serializedPayload.serManifest,
+ serializedMetadata.map(_.payload),
+ serializedMetadata.map(_.serId),
+ serializedMetadata.map(_.serManifest))
+
+ (row, tags)
+ }
+
+ private def writeJournalRowsStatements(
+ journalSerializedRow: JournalAkkaSerializationRow,
+ tags: Set[String]): DBIO[Unit] = {
+ val journalInsert: DBIO[Long] = newJournalQueries.JournalTable
+ .returning(newJournalQueries.JournalTable.map(_.ordering))
+ .forceInsert(journalSerializedRow)
+
+ val tagInserts =
+ newJournalQueries.TagTable ++= tags.map(tag => TagRow(journalSerializedRow.ordering, tag)).toSeq
+
+ journalInsert.flatMap(_ => tagInserts.asInstanceOf[DBIO[Unit]])
+ }
+}
+
+case object JournalMigrator {
+ final val JournalConfig: String = "jdbc-journal"
+ final val ReadJournalConfig: String = "jdbc-read-journal"
+}
diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala
new file mode 100644
index 000000000..99ad97685
--- /dev/null
+++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/SnapshotMigrator.scala
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2014 - 2019 Dennis Vriend
+ * Copyright (C) 2019 - 2021 Lightbend Inc.
+ */
+
+package akka.persistence.jdbc.migrator
+
+import akka.actor.ActorSystem
+import akka.persistence.SnapshotMetadata
+import akka.persistence.jdbc.config.{ ReadJournalConfig, SnapshotConfig }
+import akka.persistence.jdbc.db.SlickExtension
+import akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao
+import akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao
+import akka.persistence.jdbc.snapshot.dao.legacy.{ ByteArraySnapshotSerializer, SnapshotQueries }
+import akka.persistence.jdbc.snapshot.dao.legacy.SnapshotTables.SnapshotRow
+import akka.serialization.{ Serialization, SerializationExtension }
+import akka.stream.scaladsl.{ Sink, Source }
+import akka.Done
+import akka.persistence.jdbc.migrator.JournalMigrator.ReadJournalConfig
+import akka.persistence.jdbc.migrator.SnapshotMigrator.{ NoParallelism, SnapshotStoreConfig }
+import org.slf4j.{ Logger, LoggerFactory }
+import slick.jdbc
+import slick.jdbc.{ JdbcBackend, JdbcProfile }
+
+import scala.concurrent.Future
+
+/**
+ * This will help migrate the legacy snapshot data onto the new snapshot schema with the
+ * appropriate serialization
+ *
+ * @param system the actor system
+ */
+case class SnapshotMigrator(profile: JdbcProfile)(implicit system: ActorSystem) {
+ val log: Logger = LoggerFactory.getLogger(getClass)
+
+ import system.dispatcher
+ import profile.api._
+
+ private val snapshotConfig: SnapshotConfig = new SnapshotConfig(system.settings.config.getConfig(SnapshotStoreConfig))
+ private val readJournalConfig: ReadJournalConfig = new ReadJournalConfig(
+ system.settings.config.getConfig(ReadJournalConfig))
+
+ private val snapshotDB: jdbc.JdbcBackend.Database =
+ SlickExtension(system).database(system.settings.config.getConfig(SnapshotStoreConfig)).database
+
+ private val journalDB: JdbcBackend.Database =
+ SlickExtension(system).database(system.settings.config.getConfig(ReadJournalConfig)).database
+
+ private val serialization: Serialization = SerializationExtension(system)
+ private val queries: SnapshotQueries = new SnapshotQueries(profile, snapshotConfig.legacySnapshotTableConfiguration)
+ private val serializer: ByteArraySnapshotSerializer = new ByteArraySnapshotSerializer(serialization)
+
+ // get the instance if the default snapshot dao
+ private val defaultSnapshotDao: DefaultSnapshotDao =
+ new DefaultSnapshotDao(snapshotDB, profile, snapshotConfig, serialization)
+
+ // get the instance of the legacy journal DAO
+ private val legacyJournalDao: ByteArrayReadJournalDao =
+ new ByteArrayReadJournalDao(journalDB, profile, readJournalConfig, SerializationExtension(system))
+
+ private def toSnapshotData(row: SnapshotRow): (SnapshotMetadata, Any) = serializer.deserialize(row).get
+
+ /**
+ * migrate the latest snapshot data
+ */
+ def migrateLatest(): Future[Done] = {
+ legacyJournalDao
+ .allPersistenceIdsSource(Long.MaxValue)
+ .mapAsync(NoParallelism) { persistenceId =>
+ // let us fetch the latest snapshot for each persistenceId
+ snapshotDB.run(queries.selectLatestByPersistenceId(persistenceId).result).map { rows =>
+ rows.headOption.map(toSnapshotData).map { case (metadata, value) =>
+ log.debug(s"migrating snapshot for ${metadata.toString}")
+ defaultSnapshotDao.save(metadata, value)
+ }
+ }
+ }
+ .runWith(Sink.ignore)
+ }
+
+ /**
+ * migrate all the legacy snapshot schema data into the new snapshot schema
+ */
+ def migrateAll(): Future[Done] = Source
+ .fromPublisher(snapshotDB.stream(queries.SnapshotTable.result))
+ .mapAsync(NoParallelism) { record =>
+ val (metadata, value) = toSnapshotData(record)
+ log.debug(s"migrating snapshot for ${metadata.toString}")
+ defaultSnapshotDao.save(metadata, value)
+ }
+ .run()
+}
+
+case object SnapshotMigrator {
+ final val SnapshotStoreConfig: String = "jdbc-snapshot-store"
+ final val NoParallelism: Int = 1
+}
diff --git a/migrator/src/test/resources/general.conf b/migrator/src/test/resources/general.conf
new file mode 100644
index 000000000..8680b4e7e
--- /dev/null
+++ b/migrator/src/test/resources/general.conf
@@ -0,0 +1,47 @@
+# Copyright (C) 2019 - 2021 Lightbend Inc.
+#
+
+// This file contains the general settings which are shared in all akka-persistence-jdbc tests
+
+akka {
+ stdout-loglevel = off // defaults to WARNING can be disabled with off. The stdout-loglevel is only in effect during system startup and shutdown
+ log-dead-letters-during-shutdown = on
+ loglevel = debug
+ log-dead-letters = on
+ log-config-on-start = off // Log the complete configuration at INFO level when the actor system is started
+
+ loggers = ["akka.event.slf4j.Slf4jLogger"]
+ logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+
+ actor {
+ // Required until https://github.com/akka/akka/pull/28333 is available
+ allow-java-serialization = on
+ debug {
+ receive = on // log all messages sent to an actor if that actors receive method is a LoggingReceive
+ autoreceive = off // log all special messages like Kill, PoisoffPill etc sent to all actors
+ lifecycle = off // log all actor lifecycle events of all actors
+ fsm = off // enable logging of all events, transitioffs and timers of FSM Actors that extend LoggingFSM
+ event-stream = off // enable logging of subscriptions (subscribe/unsubscribe) on the ActorSystem.eventStream
+ }
+ }
+}
+
+docker {
+ host = "localhost"
+ host = ${?VM_HOST}
+}
+
+jdbc-journal {
+ event-adapters {
+ event-adapter = "akka.persistence.jdbc.migrator.MigratorSpec$AccountEventAdapter"
+ }
+
+ event-adapter-bindings {
+ "akka.persistence.jdbc.migrator.MigratorSpec$AccountEvent" = event-adapter
+ }
+}
+
+// Default configurations of legacy and non-legacy snapshot tables are both set with the same name (tableName = "snapshot"); So we have to distinguish them with a different name
+jdbc-snapshot-store.tables.legacy_snapshot.tableName = "legacy_snapshot"
+
+slick.db.idleTimeout = 10000 // 10 seconds
diff --git a/migrator/src/test/resources/h2-application.conf b/migrator/src/test/resources/h2-application.conf
new file mode 100644
index 000000000..2c7f048bf
--- /dev/null
+++ b/migrator/src/test/resources/h2-application.conf
@@ -0,0 +1,42 @@
+# Copyright (C) 2019 - 2021 Lightbend Inc.
+
+// general.conf is included only for shared settings used for the akka-persistence-jdbc tests
+include "general.conf"
+
+akka {
+ persistence {
+ journal {
+ plugin = "jdbc-journal"
+ }
+ snapshot-store {
+ plugin = "jdbc-snapshot-store"
+ }
+ }
+}
+
+jdbc-journal {
+ slick = ${slick}
+}
+
+# the akka-persistence-snapshot-store in use
+jdbc-snapshot-store {
+ slick = ${slick}
+}
+
+# the akka-persistence-query provider in use
+jdbc-read-journal {
+ slick = ${slick}
+}
+
+slick {
+ profile = "slick.jdbc.H2Profile$"
+ db {
+ url = "jdbc:h2:mem:test-database;DATABASE_TO_UPPER=false;"
+ user = "root"
+ password = "root"
+ driver = "org.h2.Driver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+}
diff --git a/migrator/src/test/resources/mysql-application.conf b/migrator/src/test/resources/mysql-application.conf
new file mode 100644
index 000000000..6b9cf8b46
--- /dev/null
+++ b/migrator/src/test/resources/mysql-application.conf
@@ -0,0 +1,48 @@
+# Copyright (C) 2019 - 2021 Lightbend Inc.
+
+// general.conf is included only for shared settings used for the akka-persistence-jdbc tests
+include "general.conf"
+
+akka {
+ persistence {
+ journal {
+ plugin = "jdbc-journal"
+ // Enable the line below to automatically start the journal when the actorsystem is started
+ // auto-start-journals = ["jdbc-journal"]
+ }
+ snapshot-store {
+ plugin = "jdbc-snapshot-store"
+ // Enable the line below to automatically start the snapshot-store when the actorsystem is started
+ // auto-start-snapshot-stores = ["jdbc-snapshot-store"]
+ }
+ }
+}
+
+jdbc-journal {
+ slick = ${slick}
+}
+
+# the akka-persistence-snapshot-store in use
+jdbc-snapshot-store {
+ slick = ${slick}
+}
+
+# the akka-persistence-query provider in use
+jdbc-read-journal {
+ slick = ${slick}
+}
+
+slick {
+ profile = "slick.jdbc.MySQLProfile$"
+ db {
+ host = ${docker.host}
+ host = ${?DB_HOST}
+ url = "jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true"
+ user = "root"
+ password = "root"
+ driver = "com.mysql.cj.jdbc.Driver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+}
diff --git a/migrator/src/test/resources/oracle-application.conf b/migrator/src/test/resources/oracle-application.conf
new file mode 100644
index 000000000..c1e072b86
--- /dev/null
+++ b/migrator/src/test/resources/oracle-application.conf
@@ -0,0 +1,50 @@
+# Copyright (C) 2019 - 2021 Lightbend Inc.
+
+// general.conf is included only for shared settings used for the akka-persistence-jdbc tests
+include "general.conf"
+include "oracle-schema-overrides.conf"
+
+akka {
+ persistence {
+ journal {
+ plugin = "jdbc-journal"
+ // Enable the line below to automatically start the journal when the actorsystem is started
+ // auto-start-journals = ["jdbc-journal"]
+ }
+ snapshot-store {
+ plugin = "jdbc-snapshot-store"
+ // Enable the line below to automatically start the snapshot-store when the actorsystem is started
+ // auto-start-snapshot-stores = ["jdbc-snapshot-store"]
+ }
+ }
+}
+
+
+jdbc-journal {
+ slick = ${slick}
+}
+
+# the akka-persistence-snapshot-store in use
+jdbc-snapshot-store {
+ slick = ${slick}
+}
+
+# the akka-persistence-query provider in use
+jdbc-read-journal {
+ slick = ${slick}
+}
+
+slick {
+ profile = "slick.jdbc.OracleProfile$"
+ db {
+ host = ${docker.host}
+ host = ${?DB_HOST}
+ url = "jdbc:oracle:thin:@//"${slick.db.host}":1521/xe"
+ user = "system"
+ password = "oracle"
+ driver = "oracle.jdbc.OracleDriver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+}
diff --git a/migrator/src/test/resources/postgres-application.conf b/migrator/src/test/resources/postgres-application.conf
new file mode 100644
index 000000000..b93acaf22
--- /dev/null
+++ b/migrator/src/test/resources/postgres-application.conf
@@ -0,0 +1,48 @@
+# Copyright (C) 2019 - 2021 Lightbend Inc.
+
+// general.conf is included only for shared settings used for the akka-persistence-jdbc tests
+include "general.conf"
+
+akka {
+ persistence {
+ journal {
+ plugin = "jdbc-journal"
+ // Enable the line below to automatically start the journal when the actorsystem is started
+ // auto-start-journals = ["jdbc-journal"]
+ }
+ snapshot-store {
+ plugin = "jdbc-snapshot-store"
+ // Enable the line below to automatically start the snapshot-store when the actorsystem is started
+ // auto-start-snapshot-stores = ["jdbc-snapshot-store"]
+ }
+ }
+}
+
+jdbc-journal {
+ slick = ${slick}
+}
+
+# the akka-persistence-snapshot-store in use
+jdbc-snapshot-store {
+ slick = ${slick}
+}
+
+# the akka-persistence-query provider in use
+jdbc-read-journal {
+ slick = ${slick}
+}
+
+slick {
+ profile = "slick.jdbc.PostgresProfile$"
+ db {
+ host = "localhost"
+ host = ${?DB_HOST}
+ url = "jdbc:postgresql://"${slick.db.host}":5432/docker?reWriteBatchedInserts=true"
+ user = "docker"
+ password = "docker"
+ driver = "org.postgresql.Driver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+}
diff --git a/migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql b/migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql
new file mode 100644
index 000000000..2a82b090d
--- /dev/null
+++ b/migrator/src/test/resources/schema/h2/h2-create-schema-legacy.sql
@@ -0,0 +1,34 @@
+CREATE TABLE IF NOT EXISTS PUBLIC."journal" (
+ "ordering" BIGINT AUTO_INCREMENT,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" BIGINT NOT NULL,
+ "deleted" BOOLEAN DEFAULT FALSE NOT NULL,
+ "tags" VARCHAR(255) DEFAULT NULL,
+ "message" BYTEA NOT NULL,
+ PRIMARY KEY("persistence_id", "sequence_number")
+);
+CREATE UNIQUE INDEX IF NOT EXISTS "journal_ordering_idx" ON PUBLIC."journal"("ordering");
+
+CREATE TABLE IF NOT EXISTS PUBLIC."legacy_snapshot" (
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" BIGINT NOT NULL,
+ "created" BIGINT NOT NULL,
+ "snapshot" BYTEA NOT NULL,
+ PRIMARY KEY("persistence_id", "sequence_number")
+);
+
+
+CREATE TABLE IF NOT EXISTS "durable_state" (
+ "global_offset" BIGINT NOT NULL AUTO_INCREMENT,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "revision" BIGINT NOT NULL,
+ "state_payload" BLOB NOT NULL,
+ "state_serial_id" INTEGER NOT NULL,
+ "state_serial_manifest" VARCHAR,
+ "tag" VARCHAR,
+ "state_timestamp" BIGINT NOT NULL,
+ PRIMARY KEY("persistence_id")
+ );
+
+CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
+CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
diff --git a/migrator/src/test/resources/schema/h2/h2-create-schema.sql b/migrator/src/test/resources/schema/h2/h2-create-schema.sql
new file mode 100644
index 000000000..5167d1b92
--- /dev/null
+++ b/migrator/src/test/resources/schema/h2/h2-create-schema.sql
@@ -0,0 +1,54 @@
+CREATE TABLE IF NOT EXISTS "event_journal" (
+ "ordering" BIGINT NOT NULL AUTO_INCREMENT,
+ "deleted" BOOLEAN DEFAULT false NOT NULL,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" BIGINT NOT NULL,
+ "writer" VARCHAR NOT NULL,
+ "write_timestamp" BIGINT NOT NULL,
+ "adapter_manifest" VARCHAR NOT NULL,
+ "event_payload" BLOB NOT NULL,
+ "event_ser_id" INTEGER NOT NULL,
+ "event_ser_manifest" VARCHAR NOT NULL,
+ "meta_payload" BLOB,
+ "meta_ser_id" INTEGER,
+ "meta_ser_manifest" VARCHAR,
+ PRIMARY KEY("persistence_id","sequence_number")
+ );
+
+CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering");
+
+CREATE TABLE IF NOT EXISTS "event_tag" (
+ "event_id" BIGINT NOT NULL,
+ "tag" VARCHAR NOT NULL,
+ PRIMARY KEY("event_id", "tag"),
+ CONSTRAINT fk_event_journal
+ FOREIGN KEY("event_id")
+ REFERENCES "event_journal"("ordering")
+ ON DELETE CASCADE
+);
+
+CREATE TABLE IF NOT EXISTS "snapshot" (
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" BIGINT NOT NULL,
+ "created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL,
+ "snapshot_ser_manifest" VARCHAR NOT NULL,
+ "snapshot_payload" BLOB NOT NULL,
+ "meta_ser_id" INTEGER,
+ "meta_ser_manifest" VARCHAR,
+ "meta_payload" BLOB,
+ PRIMARY KEY("persistence_id","sequence_number")
+ );
+
+CREATE TABLE IF NOT EXISTS "durable_state" (
+ "global_offset" BIGINT NOT NULL AUTO_INCREMENT,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "revision" BIGINT NOT NULL,
+ "state_payload" BLOB NOT NULL,
+ "state_serial_id" INTEGER NOT NULL,
+ "state_serial_manifest" VARCHAR,
+ "tag" VARCHAR,
+ "state_timestamp" BIGINT NOT NULL,
+ PRIMARY KEY("persistence_id")
+ );
+CREATE INDEX "state_tag_idx" on "durable_state" ("tag");
+CREATE INDEX "state_global_offset_idx" on "durable_state" ("global_offset");
diff --git a/migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql b/migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql
new file mode 100644
index 000000000..499ed5b29
--- /dev/null
+++ b/migrator/src/test/resources/schema/h2/h2-drop-schema-legacy.sql
@@ -0,0 +1,3 @@
+DROP TABLE IF EXISTS PUBLIC."journal";
+DROP TABLE IF EXISTS PUBLIC."legacy_snapshot";
+DROP TABLE IF EXISTS PUBLIC."durable_state";
diff --git a/migrator/src/test/resources/schema/h2/h2-drop-schema.sql b/migrator/src/test/resources/schema/h2/h2-drop-schema.sql
new file mode 100644
index 000000000..3d5ab8e97
--- /dev/null
+++ b/migrator/src/test/resources/schema/h2/h2-drop-schema.sql
@@ -0,0 +1,4 @@
+DROP TABLE IF EXISTS PUBLIC."event_tag";
+DROP TABLE IF EXISTS PUBLIC."event_journal";
+DROP TABLE IF EXISTS PUBLIC."snapshot";
+DROP TABLE IF EXISTS PUBLIC."durable_state";
diff --git a/migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql b/migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql
new file mode 100644
index 000000000..841e65561
--- /dev/null
+++ b/migrator/src/test/resources/schema/mysql/mysql-create-schema-legacy.sql
@@ -0,0 +1,18 @@
+CREATE TABLE IF NOT EXISTS journal (
+ ordering SERIAL,
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ tags VARCHAR(255) DEFAULT NULL,
+ message BLOB NOT NULL,
+ PRIMARY KEY(persistence_id, sequence_number)
+);
+CREATE UNIQUE INDEX journal_ordering_idx ON journal(ordering);
+
+CREATE TABLE IF NOT EXISTS legacy_snapshot (
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ created BIGINT NOT NULL,
+ snapshot BLOB NOT NULL,
+ PRIMARY KEY (persistence_id, sequence_number)
+);
diff --git a/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql b/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql
new file mode 100644
index 000000000..5c57be277
--- /dev/null
+++ b/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql
@@ -0,0 +1,38 @@
+CREATE TABLE IF NOT EXISTS event_journal(
+ ordering SERIAL,
+ deleted BOOLEAN DEFAULT false NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ writer TEXT NOT NULL,
+ write_timestamp BIGINT NOT NULL,
+ adapter_manifest TEXT NOT NULL,
+ event_payload BLOB NOT NULL,
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest TEXT NOT NULL,
+ meta_payload BLOB,
+ meta_ser_id INTEGER,meta_ser_manifest TEXT,
+ PRIMARY KEY(persistence_id,sequence_number)
+);
+
+CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);
+
+CREATE TABLE IF NOT EXISTS event_tag (
+ event_id BIGINT UNSIGNED NOT NULL,
+ tag VARCHAR(255) NOT NULL,
+ PRIMARY KEY(event_id, tag),
+ FOREIGN KEY (event_id)
+ REFERENCES event_journal(ordering)
+ ON DELETE CASCADE
+ );
+
+CREATE TABLE IF NOT EXISTS snapshot (
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ created BIGINT NOT NULL,
+ snapshot_ser_id INTEGER NOT NULL,
+ snapshot_ser_manifest TEXT NOT NULL,
+ snapshot_payload BLOB NOT NULL,
+ meta_ser_id INTEGER,
+ meta_ser_manifest TEXT,
+ meta_payload BLOB,
+ PRIMARY KEY (persistence_id, sequence_number));
diff --git a/migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql b/migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql
new file mode 100644
index 000000000..7a3cc849f
--- /dev/null
+++ b/migrator/src/test/resources/schema/mysql/mysql-drop-schema-legacy.sql
@@ -0,0 +1,2 @@
+DROP TABLE IF EXISTS journal;
+DROP TABLE IF EXISTS legacy_snapshot;
diff --git a/migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql b/migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql
new file mode 100644
index 000000000..750504e76
--- /dev/null
+++ b/migrator/src/test/resources/schema/mysql/mysql-drop-schema.sql
@@ -0,0 +1,3 @@
+DROP TABLE IF EXISTS event_tag;
+DROP TABLE IF EXISTS event_journal;
+DROP TABLE IF EXISTS snapshot;
diff --git a/migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql b/migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql
new file mode 100644
index 000000000..8cbb05988
--- /dev/null
+++ b/migrator/src/test/resources/schema/oracle/oracle-create-schema-legacy.sql
@@ -0,0 +1,44 @@
+CREATE SEQUENCE "ordering_seq" START WITH 1 INCREMENT BY 1 NOMAXVALUE
+/
+
+CREATE TABLE "journal" (
+ "ordering" NUMERIC,
+ "deleted" char check ("deleted" in (0,1)) NOT NULL,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" NUMERIC NOT NULL,
+ "tags" VARCHAR(255) DEFAULT NULL,
+ "message" BLOB NOT NULL,
+ PRIMARY KEY("persistence_id", "sequence_number")
+)
+/
+
+CREATE UNIQUE INDEX "journal_ordering_idx" ON "journal"("ordering")
+/
+
+CREATE OR REPLACE TRIGGER "ordering_seq_trigger"
+BEFORE INSERT ON "journal"
+FOR EACH ROW
+BEGIN
+ SELECT "ordering_seq".NEXTVAL INTO :NEW."ordering" FROM DUAL;
+END;
+/
+
+CREATE OR REPLACE PROCEDURE "reset_sequence"
+IS
+ l_value NUMBER;
+BEGIN
+ EXECUTE IMMEDIATE 'SELECT "ordering_seq".nextval FROM dual' INTO l_value;
+ EXECUTE IMMEDIATE 'ALTER SEQUENCE "ordering_seq" INCREMENT BY -' || l_value || ' MINVALUE 0';
+ EXECUTE IMMEDIATE 'SELECT "ordering_seq".nextval FROM dual' INTO l_value;
+ EXECUTE IMMEDIATE 'ALTER SEQUENCE "ordering_seq" INCREMENT BY 1 MINVALUE 0';
+END;
+/
+
+CREATE TABLE "legacy_snapshot" (
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" NUMERIC NOT NULL,
+ "created" NUMERIC NOT NULL,
+ "snapshot" BLOB NOT NULL,
+ PRIMARY KEY ("persistence_id", "sequence_number")
+)
+/
\ No newline at end of file
diff --git a/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql b/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql
new file mode 100644
index 000000000..dde92755f
--- /dev/null
+++ b/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql
@@ -0,0 +1,57 @@
+CREATE SEQUENCE EVENT_JOURNAL__ORDERING_SEQ START WITH 1 INCREMENT BY 1 NOMAXVALUE
+/
+
+CREATE TABLE EVENT_JOURNAL (
+ ORDERING NUMERIC UNIQUE,
+ DELETED CHAR(1) DEFAULT 0 NOT NULL check (DELETED in (0, 1)),
+ PERSISTENCE_ID VARCHAR(255) NOT NULL,
+ SEQUENCE_NUMBER NUMERIC NOT NULL,
+ WRITER VARCHAR(255) NOT NULL,
+ WRITE_TIMESTAMP NUMBER(19) NOT NULL,
+ ADAPTER_MANIFEST VARCHAR(255),
+ EVENT_PAYLOAD BLOB NOT NULL,
+ EVENT_SER_ID NUMBER(10) NOT NULL,
+ EVENT_SER_MANIFEST VARCHAR(255),
+ META_PAYLOAD BLOB,
+ META_SER_ID NUMBER(10),
+ META_SER_MANIFEST VARCHAR(255),
+ PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER)
+ )
+/
+
+CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOURNAL REFERENCING NEW AS NEW FOR EACH ROW WHEN (new.ORDERING is null) begin select EVENT_JOURNAL__ORDERING_seq.nextval into :new.ORDERING from sys.dual; end;
+/
+
+CREATE TABLE EVENT_TAG (
+ EVENT_ID NUMERIC NOT NULL,
+ TAG VARCHAR(255) NOT NULL,
+ PRIMARY KEY(EVENT_ID, TAG),
+ FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING)
+ ON DELETE CASCADE
+ )
+/
+
+CREATE TABLE SNAPSHOT (
+ PERSISTENCE_ID VARCHAR(255) NOT NULL,
+ SEQUENCE_NUMBER NUMERIC NOT NULL,
+ CREATED NUMERIC NOT NULL,
+ SNAPSHOT_SER_ID NUMBER(10) NOT NULL,
+ SNAPSHOT_SER_MANIFEST VARCHAR(255),
+ SNAPSHOT_PAYLOAD BLOB NOT NULL,
+ META_SER_ID NUMBER(10),
+ META_SER_MANIFEST VARCHAR(255),
+ META_PAYLOAD BLOB,
+ PRIMARY KEY(PERSISTENCE_ID,SEQUENCE_NUMBER)
+ )
+/
+
+CREATE OR REPLACE PROCEDURE "reset_sequence"
+IS
+ l_value NUMBER;
+BEGIN
+ EXECUTE IMMEDIATE 'SELECT EVENT_JOURNAL__ORDERING_SEQ.nextval FROM dual' INTO l_value;
+ EXECUTE IMMEDIATE 'ALTER SEQUENCE EVENT_JOURNAL__ORDERING_SEQ INCREMENT BY -' || l_value || ' MINVALUE 0';
+ EXECUTE IMMEDIATE 'SELECT EVENT_JOURNAL__ORDERING_SEQ.nextval FROM dual' INTO l_value;
+ EXECUTE IMMEDIATE 'ALTER SEQUENCE EVENT_JOURNAL__ORDERING_SEQ INCREMENT BY 1 MINVALUE 0';
+END;
+/
diff --git a/migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql b/migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql
new file mode 100644
index 000000000..0d2ef1100
--- /dev/null
+++ b/migrator/src/test/resources/schema/oracle/oracle-drop-schema-legacy.sql
@@ -0,0 +1,21 @@
+-- (ddl lock timeout in seconds) this allows tests which are still writing to the db to finish gracefully
+ALTER SESSION SET ddl_lock_timeout = 150
+/
+
+DROP TABLE "journal" CASCADE CONSTRAINT
+/
+
+DROP TABLE "legacy_snapshot" CASCADE CONSTRAINT
+/
+
+DROP TABLE "deleted_to" CASCADE CONSTRAINT
+/
+
+DROP TRIGGER "ordering_seq_trigger"
+/
+
+DROP PROCEDURE "reset_sequence"
+/
+
+DROP SEQUENCE "ordering_seq"
+/
diff --git a/migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql b/migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql
new file mode 100644
index 000000000..ed69f1f0d
--- /dev/null
+++ b/migrator/src/test/resources/schema/oracle/oracle-drop-schema.sql
@@ -0,0 +1,20 @@
+ALTER SESSION SET ddl_lock_timeout = 15
+/
+
+DROP TABLE EVENT_TAG CASCADE CONSTRAINT
+/
+
+DROP TABLE EVENT_JOURNAL CASCADE CONSTRAINT
+/
+
+DROP TABLE SNAPSHOT CASCADE CONSTRAINT
+/
+
+DROP TABLE SNAPSHOT CASCADE CONSTRAINT
+/
+
+DROP SEQUENCE EVENT_JOURNAL__ORDERING_SEQ
+/
+
+DROP TRIGGER EVENT_JOURNAL__ORDERING_TRG
+/
diff --git a/migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql b/migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql
new file mode 100644
index 000000000..123d5dea7
--- /dev/null
+++ b/migrator/src/test/resources/schema/postgres/postgres-create-schema-legacy.sql
@@ -0,0 +1,32 @@
+CREATE TABLE IF NOT EXISTS public.journal (
+ ordering BIGSERIAL,
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ tags VARCHAR(255) DEFAULT NULL,
+ message BYTEA NOT NULL,
+ PRIMARY KEY(persistence_id, sequence_number)
+);
+CREATE UNIQUE INDEX IF NOT EXISTS journal_ordering_idx ON public.journal(ordering);
+
+CREATE TABLE IF NOT EXISTS public.legacy_snapshot (
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ created BIGINT NOT NULL,
+ snapshot BYTEA NOT NULL,
+ PRIMARY KEY(persistence_id, sequence_number)
+);
+
+CREATE TABLE IF NOT EXISTS public.durable_state (
+ global_offset BIGSERIAL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ state_payload BYTEA NOT NULL,
+ state_serial_id INTEGER NOT NULL,
+ state_serial_manifest VARCHAR(255),
+ tag VARCHAR,
+ state_timestamp BIGINT NOT NULL,
+ PRIMARY KEY(persistence_id)
+ );
+CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag);
+CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset);
diff --git a/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql b/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql
new file mode 100644
index 000000000..7ae7e0999
--- /dev/null
+++ b/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql
@@ -0,0 +1,62 @@
+CREATE TABLE IF NOT EXISTS public.event_journal(
+ ordering BIGSERIAL,
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+
+ writer VARCHAR(255) NOT NULL,
+ write_timestamp BIGINT,
+ adapter_manifest VARCHAR(255),
+
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest VARCHAR(255) NOT NULL,
+ event_payload BYTEA NOT NULL,
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id, sequence_number)
+);
+
+CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering);
+
+CREATE TABLE IF NOT EXISTS public.event_tag(
+ event_id BIGINT,
+ tag VARCHAR(256),
+ PRIMARY KEY(event_id, tag),
+ CONSTRAINT fk_event_journal
+ FOREIGN KEY(event_id)
+ REFERENCES event_journal(ordering)
+ ON DELETE CASCADE
+);
+
+CREATE TABLE IF NOT EXISTS public.snapshot (
+ persistence_id VARCHAR(255) NOT NULL,
+ sequence_number BIGINT NOT NULL,
+ created BIGINT NOT NULL,
+
+ snapshot_ser_id INTEGER NOT NULL,
+ snapshot_ser_manifest VARCHAR(255) NOT NULL,
+ snapshot_payload BYTEA NOT NULL,
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id, sequence_number)
+);
+
+CREATE TABLE IF NOT EXISTS public.durable_state (
+ global_offset BIGSERIAL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ state_payload BYTEA NOT NULL,
+ state_serial_id INTEGER NOT NULL,
+ state_serial_manifest VARCHAR(255),
+ tag VARCHAR,
+ state_timestamp BIGINT NOT NULL,
+ PRIMARY KEY(persistence_id)
+ );
+CREATE INDEX CONCURRENTLY state_tag_idx on public.durable_state (tag);
+CREATE INDEX CONCURRENTLY state_global_offset_idx on public.durable_state (global_offset);
diff --git a/migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql b/migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql
new file mode 100644
index 000000000..ae824f1b8
--- /dev/null
+++ b/migrator/src/test/resources/schema/postgres/postgres-drop-schema-legacy.sql
@@ -0,0 +1,3 @@
+DROP TABLE IF EXISTS public.journal;
+DROP TABLE IF EXISTS public.legacy_snapshot;
+DROP TABLE IF EXISTS public.durable_state;
diff --git a/migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql b/migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql
new file mode 100644
index 000000000..01cb9b461
--- /dev/null
+++ b/migrator/src/test/resources/schema/postgres/postgres-drop-schema.sql
@@ -0,0 +1,5 @@
+DROP TABLE IF EXISTS public.event_tag;
+DROP TABLE IF EXISTS public.event_journal;
+DROP TABLE IF EXISTS public.snapshot;
+DROP TABLE IF EXISTS public.durable_state;
+
diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql
new file mode 100644
index 000000000..12ba4d411
--- /dev/null
+++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema-legacy.sql
@@ -0,0 +1,24 @@
+IF NOT EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'"journal"') AND type in (N'U'))
+begin
+CREATE TABLE journal (
+ "ordering" BIGINT IDENTITY(1,1) NOT NULL,
+ "deleted" BIT DEFAULT 0 NOT NULL,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" NUMERIC(10,0) NOT NULL,
+ "tags" VARCHAR(255) NULL DEFAULT NULL,
+ "message" VARBINARY(max) NOT NULL,
+ PRIMARY KEY ("persistence_id", "sequence_number")
+)
+CREATE UNIQUE INDEX journal_ordering_idx ON journal (ordering)
+end;
+
+
+IF NOT EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'"snapshot"') AND type in (N'U'))
+CREATE TABLE legacy_snapshot (
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" NUMERIC(10,0) NOT NULL,
+ "created" NUMERIC NOT NULL,
+ "snapshot" VARBINARY(max) NOT NULL,
+ PRIMARY KEY ("persistence_id", "sequence_number")
+);
+end;
diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql
new file mode 100644
index 000000000..f4cf59f18
--- /dev/null
+++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql
@@ -0,0 +1,42 @@
+CREATE TABLE event_journal(
+ "ordering" BIGINT IDENTITY(1,1) NOT NULL,
+ "deleted" BIT DEFAULT 0 NOT NULL,
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" NUMERIC(10,0) NOT NULL,
+ "writer" VARCHAR(255) NOT NULL,
+ "write_timestamp" BIGINT NOT NULL,
+ "adapter_manifest" VARCHAR(MAX) NOT NULL,
+ "event_payload" VARBINARY(MAX) NOT NULL,
+ "event_ser_id" INTEGER NOT NULL,
+ "event_ser_manifest" VARCHAR(MAX) NOT NULL,
+ "meta_payload" VARBINARY(MAX),
+ "meta_ser_id" INTEGER,
+ "meta_ser_manifest" VARCHAR(MAX)
+ PRIMARY KEY ("persistence_id", "sequence_number")
+);
+
+CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);
+
+CREATE TABLE event_tag (
+ "event_id" BIGINT NOT NULL,
+ "tag" VARCHAR(255) NOT NULL
+ PRIMARY KEY ("event_id","tag")
+ constraint "fk_event_journal"
+ foreign key("event_id")
+ references "dbo"."event_journal"("ordering")
+ on delete CASCADE
+);
+
+CREATE TABLE "snapshot" (
+ "persistence_id" VARCHAR(255) NOT NULL,
+ "sequence_number" NUMERIC(10,0) NOT NULL,
+ "created" BIGINT NOT NULL,
+ "snapshot_ser_id" INTEGER NOT NULL,
+ "snapshot_ser_manifest" VARCHAR(255) NOT NULL,
+ "snapshot_payload" VARBINARY(MAX) NOT NULL,
+ "meta_ser_id" INTEGER,
+ "meta_ser_manifest" VARCHAR(255),
+ "meta_payload" VARBINARY(MAX),
+ PRIMARY KEY ("persistence_id", "sequence_number")
+ )
+
diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql
new file mode 100644
index 000000000..7a3cc849f
--- /dev/null
+++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema-legacy.sql
@@ -0,0 +1,2 @@
+DROP TABLE IF EXISTS journal;
+DROP TABLE IF EXISTS legacy_snapshot;
diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql
new file mode 100644
index 000000000..750504e76
--- /dev/null
+++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-drop-schema.sql
@@ -0,0 +1,3 @@
+DROP TABLE IF EXISTS event_tag;
+DROP TABLE IF EXISTS event_journal;
+DROP TABLE IF EXISTS snapshot;
diff --git a/migrator/src/test/resources/sqlserver-application.conf b/migrator/src/test/resources/sqlserver-application.conf
new file mode 100644
index 000000000..0298fd82d
--- /dev/null
+++ b/migrator/src/test/resources/sqlserver-application.conf
@@ -0,0 +1,65 @@
+# Copyright (C) 2019 - 2021 Lightbend Inc.
+
+include "general.conf"
+
+akka {
+ persistence {
+ journal {
+ plugin = "jdbc-journal"
+ // Enable the line below to automatically start the journal when the actorsystem is started
+ // auto-start-journals = ["jdbc-journal"]
+ }
+ snapshot-store {
+ plugin = "jdbc-snapshot-store"
+ // Enable the line below to automatically start the snapshot-store when the actorsystem is started
+ // auto-start-snapshot-stores = ["jdbc-snapshot-store"]
+ }
+ }
+}
+
+jdbc-journal {
+ tables {
+ journal {
+ schemaName = "dbo"
+ }
+ }
+
+ slick = ${slick}
+}
+
+# the akka-persistence-snapshot-store in use
+jdbc-snapshot-store {
+ tables {
+ snapshot {
+ schemaName = "dbo"
+ }
+ }
+
+ slick = ${slick}
+}
+
+# the akka-persistence-query provider in use
+jdbc-read-journal {
+ tables {
+ journal {
+ schemaName = "dbo"
+ }
+ }
+
+ slick = ${slick}
+}
+
+slick {
+ profile = "slick.jdbc.SQLServerProfile$"
+ db {
+ host = ${docker.host}
+ host = ${?DB_HOST}
+ url = "jdbc:sqlserver://"${slick.db.host}":1433;databaseName=docker;integratedSecurity=false"
+ user = "docker"
+ password = "docker"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+}
diff --git a/migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala b/migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala
new file mode 100644
index 000000000..b000ee293
--- /dev/null
+++ b/migrator/src/test/scala/akka/persistence/jdbc/migrator/JournalMigratorTest.scala
@@ -0,0 +1,126 @@
+package akka.persistence.jdbc.migrator
+
+import akka.Done
+import akka.pattern.ask
+import akka.persistence.jdbc.db.SlickDatabase
+import akka.persistence.jdbc.migrator.MigratorSpec._
+
+abstract class JournalMigratorTest(configName: String) extends MigratorSpec(configName) {
+
+ it should "migrate the event journal" in {
+ withLegacyActorSystem { implicit systemLegacy =>
+ withReadJournal { implicit readJournal =>
+ withTestActors() { (actorA1, actorA2, actorA3) =>
+ eventually {
+ countJournal().futureValue shouldBe 0
+ (actorA1 ? CreateAccount(1)).futureValue //balance 1
+ (actorA2 ? CreateAccount(2)).futureValue //balance 2
+ (actorA3 ? CreateAccount(3)).futureValue //balance 3
+ (actorA1 ? Deposit(3)).futureValue //balance 4
+ (actorA2 ? Deposit(2)).futureValue //balance 4
+ (actorA3 ? Deposit(1)).futureValue //balance 4
+ (actorA1 ? Withdraw(3)).futureValue //balance 1
+ (actorA2 ? Withdraw(2)).futureValue //balance 1
+ (actorA3 ? Withdraw(1)).futureValue //balance 1
+ (actorA1 ? State).mapTo[Int].futureValue shouldBe 1
+ (actorA2 ? State).mapTo[Int].futureValue shouldBe 2
+ (actorA3 ? State).mapTo[Int].futureValue shouldBe 3
+ countJournal().futureValue shouldBe 9
+ }
+ }
+ }
+ } // legacy persistence
+ withActorSystem { implicit systemNew =>
+ withReadJournal { implicit readJournal =>
+ eventually {
+ countJournal().futureValue shouldBe 0 // before migration
+ JournalMigrator(SlickDatabase.profile(config, "slick")).migrate().futureValue shouldBe Done
+ countJournal().futureValue shouldBe 9 // after migration
+ }
+ withTestActors() { (actorB1, actorB2, actorB3) =>
+ eventually {
+ (actorB1 ? State).mapTo[Int].futureValue shouldBe 1
+ (actorB2 ? State).mapTo[Int].futureValue shouldBe 2
+ (actorB3 ? State).mapTo[Int].futureValue shouldBe 3
+ }
+ }
+ }
+ } // new persistence
+ }
+
+ it should "migrate the event journal preserving the order of events" in {
+ withLegacyActorSystem { implicit systemLegacy =>
+ withReadJournal { implicit readJournal =>
+ withTestActors() { (actorA1, actorA2, actorA3) =>
+ (actorA1 ? CreateAccount(0)).futureValue
+ (actorA2 ? CreateAccount(0)).futureValue
+ (actorA3 ? CreateAccount(0)).futureValue
+ for (i <- 1 to 999) {
+ (actorA1 ? Deposit(i)).futureValue
+ (actorA2 ? Deposit(i)).futureValue
+ (actorA3 ? Deposit(i)).futureValue
+ }
+ eventually {
+ countJournal().futureValue shouldBe 3000
+ }
+ }
+ }
+ } // legacy persistence
+ withActorSystem { implicit systemNew =>
+ withReadJournal { implicit readJournal =>
+ eventually {
+ countJournal().futureValue shouldBe 0 // before migration
+ JournalMigrator(SlickDatabase.profile(config, "slick")).migrate().futureValue shouldBe Done
+ countJournal().futureValue shouldBe 3000 // after migration
+ val allEvents: Seq[Seq[AccountEvent]] = events().futureValue
+ allEvents.size shouldBe 3
+ val seq1: Seq[Int] = allEvents.head.map(_.amount)
+ val seq2: Seq[Int] = allEvents(1).map(_.amount)
+ val seq3: Seq[Int] = allEvents(2).map(_.amount)
+ val expectedResult: Seq[Int] = 0 to 999
+ seq1 shouldBe expectedResult
+ seq2 shouldBe expectedResult
+ seq3 shouldBe expectedResult
+ }
+ }
+ } // new persistence
+ }
+
+ it should "migrate the event journal preserving tags" in {
+ withLegacyActorSystem { implicit systemLegacy =>
+ withReadJournal { implicit readJournal =>
+ withTestActors() { (actorA1, actorA2, actorA3) =>
+ (actorA1 ? CreateAccount(0)).futureValue
+ (actorA2 ? CreateAccount(0)).futureValue
+ (actorA3 ? CreateAccount(0)).futureValue
+ for (i <- 1 to 999) {
+ (actorA1 ? Deposit(i)).futureValue
+ (actorA2 ? Deposit(i)).futureValue
+ (actorA3 ? Deposit(i)).futureValue
+ }
+ eventually {
+ countJournal().futureValue shouldBe 3000
+ }
+ }
+ }
+ } // legacy persistence
+ withActorSystem { implicit systemNew =>
+ withReadJournal { implicit readJournal =>
+ eventually {
+ countJournal().futureValue shouldBe 0 // before migration
+ JournalMigrator(SlickDatabase.profile(config, "slick")).migrate().futureValue shouldBe Done
+ countJournal().futureValue shouldBe 3000 // after migration
+ val evenEvents: Seq[AccountEvent] = eventsByTag(MigratorSpec.Even).futureValue
+ evenEvents.size shouldBe 1500
+ evenEvents.forall(e => e.amount % 2 == 0) shouldBe true
+
+ val oddEvents: Seq[AccountEvent] = eventsByTag(MigratorSpec.Odd).futureValue
+ oddEvents.size shouldBe 1500
+ oddEvents.forall(e => e.amount % 2 == 1) shouldBe true
+ }
+ }
+ } // new persistence
+ }
+}
+
+class H2JournalMigratorTest extends JournalMigratorTest("h2-application.conf") with MigratorSpec.H2Cleaner
diff --git a/migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala b/migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala
new file mode 100644
index 000000000..1f512aa8d
--- /dev/null
+++ b/migrator/src/test/scala/akka/persistence/jdbc/migrator/MigratorSpec.scala
@@ -0,0 +1,396 @@
+package akka.persistence.jdbc.migrator
+
+import akka.actor.{ ActorRef, ActorSystem, Props, Stash }
+import akka.event.LoggingReceive
+import akka.pattern.ask
+import akka.persistence.jdbc.SimpleSpec
+import akka.persistence.jdbc.config.{ JournalConfig, SlickConfiguration }
+import akka.persistence.jdbc.db.SlickDatabase
+import akka.persistence.jdbc.migrator.MigratorSpec._
+import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
+import akka.persistence.jdbc.testkit.internal._
+import akka.persistence.journal.EventSeq.single
+import akka.persistence.journal.{ EventAdapter, EventSeq, Tagged }
+import akka.persistence.query.PersistenceQuery
+import akka.persistence.{ PersistentActor, SaveSnapshotSuccess, SnapshotMetadata, SnapshotOffer }
+import akka.stream.Materializer
+import akka.stream.scaladsl.Sink
+import akka.util.Timeout
+import com.typesafe.config.{ Config, ConfigFactory, ConfigValue, ConfigValueFactory }
+import org.scalatest.BeforeAndAfterEach
+import org.slf4j.{ Logger, LoggerFactory }
+import slick.jdbc.JdbcBackend.{ Database, Session }
+
+import java.sql.Statement
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.{ ExecutionContextExecutor, Future }
+
+abstract class MigratorSpec(val config: Config) extends SimpleSpec with BeforeAndAfterEach {
+
+ // The db is initialized in the before and after each bocks
+ var dbOpt: Option[Database] = None
+
+ implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds)
+ implicit val timeout: Timeout = Timeout(1.minute)
+
+ private val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+ private val cfg: Config = config.getConfig("jdbc-journal")
+ private val journalConfig: JournalConfig = new JournalConfig(cfg)
+
+ protected val newJournalTableName: String = journalConfig.eventJournalTableConfiguration.tableName
+ protected val legacyJournalTableName: String = journalConfig.journalTableConfiguration.tableName
+
+ protected val newTables: Seq[String] =
+ List(journalConfig.eventTagTableConfiguration.tableName, journalConfig.eventJournalTableConfiguration.tableName)
+ protected val legacyTables: Seq[String] = List(journalConfig.journalTableConfiguration.tableName)
+ protected val tables: Seq[String] = legacyTables ++ newTables
+
+ def this(config: String = "postgres-application.conf", configOverrides: Map[String, ConfigValue] = Map.empty) =
+ this(configOverrides.foldLeft(ConfigFactory.load(config)) { case (conf, (path, configValue)) =>
+ conf.withValue(path, configValue)
+ })
+
+ def db: Database = dbOpt.getOrElse {
+ val db = SlickDatabase.database(cfg, new SlickConfiguration(cfg.getConfig("slick")), "slick.db")
+ dbOpt = Some(db)
+ db
+ }
+
+ protected def dropAndCreate(schemaType: SchemaType): Unit = {
+ // blocking calls, usually done in our before test methods
+ // legacy
+ SchemaUtilsImpl.dropWithSlick(schemaType, logger, db, legacy = true)
+ SchemaUtilsImpl.createWithSlick(schemaType, logger, db, legacy = true)
+ // new
+ SchemaUtilsImpl.dropWithSlick(schemaType, logger, db, legacy = false)
+ SchemaUtilsImpl.createWithSlick(schemaType, logger, db, legacy = false)
+ }
+
+ def withSession[A](f: Session => A)(db: Database): A = {
+ val session = db.createSession()
+ try f(session)
+ finally session.close()
+ }
+
+ def withStatement[A](f: Statement => A)(db: Database): A =
+ withSession(session => session.withStatement()(f))(db)
+
+ def closeDb(): Unit = {
+ dbOpt.foreach(_.close())
+ dbOpt = None
+ }
+
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ closeDb()
+ }
+
+ override protected def afterAll(): Unit = {
+ super.afterAll()
+ closeDb()
+ }
+
+ protected def setupEmpty(persistenceId: Int)(implicit system: ActorSystem): ActorRef =
+ system.actorOf(Props(new TestAccountActor(persistenceId)))
+
+ def withTestActors(seq: Int = 1)(f: (ActorRef, ActorRef, ActorRef) => Unit)(implicit system: ActorSystem): Unit = {
+ implicit val ec: ExecutionContextExecutor = system.dispatcher
+ val refs = (seq until seq + 3).map(setupEmpty).toList
+ try {
+ // make sure we notice early if the actors failed to start (because of issues with journal) makes debugging
+ // failing tests easier as we know it is not the actual interaction from the test that is the problem
+ Future.sequence(refs.map(_ ? State)).futureValue
+
+ f(refs.head, refs.drop(1).head, refs.drop(2).head)
+ } finally killActors(refs: _*)
+ }
+
+ def withActorSystem(f: ActorSystem => Unit): Unit = {
+ implicit val system: ActorSystem = ActorSystem("migrator-test", config)
+ f(system)
+ system.terminate().futureValue
+ }
+
+ def withLegacyActorSystem(f: ActorSystem => Unit): Unit = {
+
+ val configOverrides: Map[String, ConfigValue] = Map(
+ "jdbc-journal.dao" -> ConfigValueFactory.fromAnyRef(
+ "akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao"),
+ "jdbc-snapshot-store.dao" -> ConfigValueFactory.fromAnyRef(
+ "akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao"),
+ "jdbc-read-journal.dao" -> ConfigValueFactory.fromAnyRef(
+ "akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao"))
+
+ val legacyDAOConfig = configOverrides.foldLeft(ConfigFactory.load(config)) { case (conf, (path, configValue)) =>
+ conf.withValue(path, configValue)
+ }
+
+ implicit val system: ActorSystem = ActorSystem("migrator-test", legacyDAOConfig)
+ f(system)
+ system.terminate().futureValue
+ }
+
+ def withReadJournal(f: JdbcReadJournal => Unit)(implicit system: ActorSystem): Unit = {
+ val readJournal: JdbcReadJournal =
+ PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
+ f(readJournal)
+ }
+
+ def countJournal(filterPid: String => Boolean = _ => true)(
+ implicit system: ActorSystem,
+ mat: Materializer,
+ readJournal: JdbcReadJournal): Future[Long] =
+ readJournal
+ .currentPersistenceIds()
+ .filter(filterPid(_))
+ .mapAsync(1) { pid =>
+ readJournal
+ .currentEventsByPersistenceId(pid, 0, Long.MaxValue)
+ .map(_ => 1L)
+ .runWith(Sink.seq)
+ .map(_.sum)(system.dispatcher)
+ }
+ .runWith(Sink.seq)
+ .map(_.sum)(system.dispatcher)
+
+ def eventsByTag(tag: String)(implicit mat: Materializer, readJournal: JdbcReadJournal): Future[Seq[AccountEvent]] =
+ readJournal
+ .currentEventsByTag(tag, offset = 0)
+ .map(_.event)
+ .collect { case e: AccountEvent =>
+ e
+ }
+ .runWith(Sink.seq)
+
+ def events(filterPid: String => Boolean = _ => true)(
+ implicit mat: Materializer,
+ readJournal: JdbcReadJournal): Future[Seq[Seq[AccountEvent]]] =
+ readJournal
+ .currentPersistenceIds()
+ .filter(filterPid(_))
+ .mapAsync(1) { pid =>
+ readJournal
+ .currentEventsByPersistenceId(pid, fromSequenceNr = 0, toSequenceNr = Long.MaxValue)
+ .map(e => e.event)
+ .collect { case e: AccountEvent =>
+ e
+ }
+ .runWith(Sink.seq)
+ }
+ .runWith(Sink.seq)
+
+}
+
+object MigratorSpec {
+
+ private final val Zero: Int = 0
+
+ private final val SnapshotInterval: Int = 10
+
+ val Even: String = "EVEN"
+ val Odd: String = "ODD"
+
+ /** Commands */
+ sealed trait AccountCommand extends Serializable
+
+ final case class CreateAccount(amount: Int) extends AccountCommand
+
+ final case class Deposit(amount: Int) extends AccountCommand
+
+ final case class Withdraw(amount: Int) extends AccountCommand
+
+ final object State extends AccountCommand
+
+ /** Events */
+ sealed trait AccountEvent extends Serializable {
+ val amount: Int
+ }
+
+ final case class AccountCreated(override val amount: Int) extends AccountEvent
+
+ final case class Deposited(override val amount: Int) extends AccountEvent
+
+ final case class Withdrawn(override val amount: Int) extends AccountEvent
+
+ /** Reply */
+ final case class CurrentBalance(balance: Int)
+
+ class AccountEventAdapter extends EventAdapter {
+
+ override def manifest(event: Any): String = event.getClass.getSimpleName
+
+ def fromJournal(event: Any, manifest: String): EventSeq = event match {
+ case event: AccountEvent => single(event)
+ case _ => sys.error(s"Unexpected case '${event.getClass.getName}'")
+ }
+
+ def toJournal(event: Any): Any = event match {
+ case event: AccountEvent =>
+ val tag: String = if (event.amount % 2 == 0) Even else Odd
+ Tagged(event, Set(tag))
+ case _ => sys.error(s"Unexpected case '${event.getClass.getName}'")
+ }
+ }
+
+ /** Actor */
+ class TestAccountActor(id: Int) extends PersistentActor with Stash {
+ override val persistenceId: String = s"test-account-$id"
+
+ var state: Int = Zero
+
+ private def saveSnapshot(): Unit = {
+ if (state % SnapshotInterval == 0) {
+ saveSnapshot(state)
+ }
+ }
+
+ override def receiveCommand: Receive =
+ LoggingReceive {
+
+ case SaveSnapshotSuccess(_: SnapshotMetadata) => ()
+
+ case CreateAccount(balance) =>
+ persist(AccountCreated(balance)) { (event: AccountCreated) =>
+ updateState(event)
+ saveSnapshot()
+ sender() ! akka.actor.Status.Success(event)
+ }
+ case Deposit(balance) =>
+ persist(Deposited(balance)) { (event: Deposited) =>
+ updateState(event)
+ saveSnapshot()
+ sender() ! akka.actor.Status.Success(event)
+ }
+ case Withdraw(balance) =>
+ persist(Withdrawn(balance)) { (event: Withdrawn) =>
+ updateState(event)
+ saveSnapshot()
+ sender() ! akka.actor.Status.Success(event)
+ }
+ case State =>
+ sender() ! akka.actor.Status.Success(state)
+ }
+
+ def updateState(event: AccountEvent): Unit = event match {
+ case AccountCreated(amount) => state = state + amount
+ case Deposited(amount) => state = state + amount
+ case Withdrawn(amount) => state = state - amount
+ }
+
+ override def receiveRecover: Receive =
+ LoggingReceive {
+ case SnapshotOffer(_, snapshot: Int) =>
+ state = snapshot
+ case event: AccountEvent => updateState(event)
+ }
+ }
+
+ trait PostgresCleaner extends MigratorSpec {
+
+ def clearPostgres(): Unit = {
+ tables.foreach { name =>
+ withStatement(stmt => stmt.executeUpdate(s"DELETE FROM $name"))(db)
+ }
+ }
+
+ override def beforeAll(): Unit = {
+ dropAndCreate(Postgres)
+ super.beforeAll()
+ }
+
+ override def beforeEach(): Unit = {
+ dropAndCreate(Postgres)
+ super.beforeEach()
+ }
+ }
+
+ trait MysqlCleaner extends MigratorSpec {
+
+ def clearMySQL(): Unit = {
+ withStatement { stmt =>
+ stmt.execute("SET FOREIGN_KEY_CHECKS = 0")
+ tables.foreach { name => stmt.executeUpdate(s"TRUNCATE $name") }
+ stmt.execute("SET FOREIGN_KEY_CHECKS = 1")
+ }(db)
+ }
+
+ override def beforeAll(): Unit = {
+ dropAndCreate(MySQL)
+ super.beforeAll()
+ }
+
+ override def beforeEach(): Unit = {
+ clearMySQL()
+ super.beforeEach()
+ }
+ }
+
+ trait OracleCleaner extends MigratorSpec {
+
+ def clearOracle(): Unit = {
+ tables.foreach { name =>
+ withStatement(stmt => stmt.executeUpdate(s"""DELETE FROM "$name" """))(db)
+ }
+ withStatement(stmt => stmt.executeUpdate("""BEGIN "reset_sequence"; END; """))(db)
+ }
+
+ override def beforeAll(): Unit = {
+ dropAndCreate(Oracle)
+ super.beforeAll()
+ }
+
+ override def beforeEach(): Unit = {
+ clearOracle()
+ super.beforeEach()
+ }
+ }
+
+ trait SqlServerCleaner extends MigratorSpec {
+
+ var initial = true
+
+ def clearSqlServer(): Unit = {
+ val reset = if (initial) {
+ initial = false
+ 1
+ } else {
+ 0
+ }
+ withStatement { stmt =>
+ tables.foreach { name => stmt.executeUpdate(s"DELETE FROM $name") }
+ stmt.executeUpdate(s"DBCC CHECKIDENT('$legacyJournalTableName', RESEED, $reset)")
+ stmt.executeUpdate(s"DBCC CHECKIDENT('$newJournalTableName', RESEED, $reset)")
+ }(db)
+ }
+
+ override def beforeAll(): Unit = {
+ dropAndCreate(SqlServer)
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ dropAndCreate(SqlServer)
+ super.afterAll()
+ }
+
+ override def beforeEach(): Unit = {
+ clearSqlServer()
+ super.beforeEach()
+ }
+ }
+
+ trait H2Cleaner extends MigratorSpec {
+
+ def clearH2(): Unit = {
+ tables.foreach { name =>
+ withStatement(stmt => stmt.executeUpdate(s"DELETE FROM $name"))(db)
+ }
+ }
+
+ override def beforeEach(): Unit = {
+ dropAndCreate(H2)
+ super.beforeEach()
+ }
+ }
+}
diff --git a/migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala b/migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala
new file mode 100644
index 000000000..8682cf9ae
--- /dev/null
+++ b/migrator/src/test/scala/akka/persistence/jdbc/migrator/SnapshotMigratorTest.scala
@@ -0,0 +1,50 @@
+package akka.persistence.jdbc.migrator
+
+import akka.Done
+import akka.pattern.ask
+import akka.persistence.jdbc.db.SlickDatabase
+import akka.persistence.jdbc.migrator.MigratorSpec._
+
+abstract class SnapshotMigratorTest(configName: String) extends MigratorSpec(configName) {
+
+ it should "migrate snapshots" in {
+ withLegacyActorSystem { implicit systemLegacy =>
+ withReadJournal { implicit readJournal =>
+ withTestActors() { (actorA1, actorA2, actorA3) =>
+ (actorA1 ? CreateAccount(1)).futureValue
+ (actorA2 ? CreateAccount(1)).futureValue
+ (actorA3 ? CreateAccount(1)).futureValue
+ for (_ <- 1 to 99) {
+ (actorA1 ? Deposit(1)).futureValue
+ (actorA2 ? Deposit(1)).futureValue
+ (actorA3 ? Deposit(1)).futureValue
+ }
+ eventually {
+ (actorA1 ? State).mapTo[Int].futureValue shouldBe 100
+ (actorA2 ? State).mapTo[Int].futureValue shouldBe 100
+ (actorA3 ? State).mapTo[Int].futureValue shouldBe 100
+ countJournal().futureValue shouldBe 300
+ }
+ }
+ }
+ } // legacy persistence
+ withActorSystem { implicit systemNew =>
+ withReadJournal { implicit readJournal =>
+ eventually {
+ countJournal().futureValue shouldBe 0 // before migration
+ SnapshotMigrator(SlickDatabase.profile(config, "slick")).migrateAll().futureValue shouldBe Done
+ countJournal().futureValue shouldBe 0 // after migration
+ }
+ withTestActors() { (actorB1, actorB2, actorB3) =>
+ eventually {
+ (actorB1 ? State).mapTo[Int].futureValue shouldBe 100
+ (actorB2 ? State).mapTo[Int].futureValue shouldBe 100
+ (actorB3 ? State).mapTo[Int].futureValue shouldBe 100
+ }
+ }
+ }
+ } // new persistence
+ }
+}
+
+class H2SnapshotMigratorTest extends SnapshotMigratorTest("h2-application.conf") with MigratorSpec.H2Cleaner
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index f78c983e3..800a18682 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -1,5 +1,4 @@
import sbt._
-import Keys._
object Dependencies {
val Nightly = sys.env.get("TRAVIS_EVENT_TYPE").contains("cron")
@@ -33,7 +32,6 @@ object Dependencies {
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Test)
val Migration: Seq[ModuleID] = Seq(
- "org.flywaydb" % "flyway-core" % "7.14.0",
"com.typesafe" % "config" % "1.4.1",
"ch.qos.logback" % "logback-classic" % "1.2.6",
"org.testcontainers" % "postgresql" % "1.16.0" % Test,