diff --git a/build.sbt b/build.sbt
index 56c4a774..54577bdb 100644
--- a/build.sbt
+++ b/build.sbt
@@ -160,7 +160,7 @@ lazy val docs = project
"scaladoc.akka.base_url" -> s"https://doc.akka.io/api/akka/${Dependencies.AkkaVersionInDocs}/",
"javadoc.akka.base_url" -> s"https://doc.akka.io/japi/akka/${Dependencies.AkkaVersionInDocs}/",
"scaladoc.com.typesafe.config.base_url" -> s"https://lightbend.github.io/config/latest/api/",
- "sqlserver.version" -> Dependencies.SqlServerVersion),
+ "sqlserver.version" -> Dependencies.SqlServerR2dbcVersion),
ApidocPlugin.autoImport.apidocRootPackage := "akka",
apidocRootPackage := "akka",
resolvers += Resolver.jcenterRepo,
diff --git a/docs/src/main/paradox/migration.md b/docs/src/main/paradox/migration.md
index 1fd36b3b..2836ef38 100644
--- a/docs/src/main/paradox/migration.md
+++ b/docs/src/main/paradox/migration.md
@@ -38,7 +38,8 @@ re-run the full migration.
It's recommended that you create the `migration_progress` table before running the migration tool, but
if it doesn't exist the tool will try to create the table.
-```sql
+Postgres:
+: ```sql
CREATE TABLE IF NOT EXISTS migration_progress(
persistence_id VARCHAR(255) NOT NULL,
event_seq_nr BIGINT,
@@ -47,6 +48,23 @@ CREATE TABLE IF NOT EXISTS migration_progress(
PRIMARY KEY(persistence_id)
```
+SQLServer:
+: ```sql
+IF object_id('migration_progress') is null
+ CREATE TABLE migration_progress(
+ persistence_id NVARCHAR(255) NOT NULL,
+ event_seq_nr BIGINT,
+ snapshot_seq_nr BIGINT,
+ state_revision BIGINT,
+ PRIMARY KEY(persistence_id)
+```
+
+@@@ warning { .group-sqlserver }
+
+The SQL Server dialect is marked `experimental` and not yet production ready until various [issues](https://github.com/akka/akka-persistence-r2dbc/issues?q=is%3Aopen+label%3Asqlserver+label%3Abug) with the integration of the `r2dbc-mssql` plugin have been resolved.
+
+@@@
+
## Running
The migration tool can be run as main class `akka.persistence.r2dbc.migration.MigrationTool` provided by the above
@@ -60,9 +78,13 @@ Durable State is not migrated by `MigrationTool.migrateAll`, instead you need to
## Configuration
-You need to provide configuration for the source persistence plugin and the target Rd2BC plugin in your `application.conf`. An example of such configuration for migration from Akka Persistence JDBC:
+You need to provide configuration for the source persistence plugin and the target Rd2BC plugin in your `application.conf`. An example of such configuration for migration from Akka Persistence JDBC:
+
+Postgres:
+: @@snip [application-postgres.conf](/migration-tests/src/test/resources/application-postgres-example.conf)
-@@snip [application-postgres.conf](/migration-tests/src/test/resources/application-postgres.conf)
+SQLServer:
+: @@snip [application-sqlserver.conf](/migration-tests/src/test/resources/application-sqlserver-example.conf)
@@@ note
diff --git a/migration-tests/src/test/resources/application-sqlserver-example.conf b/migration-tests/src/test/resources/application-sqlserver-example.conf
new file mode 100644
index 00000000..ade91a0b
--- /dev/null
+++ b/migration-tests/src/test/resources/application-sqlserver-example.conf
@@ -0,0 +1,45 @@
+akka.persistence.r2dbc.migration {
+ source {
+ query-plugin-id = "jdbc-read-journal"
+ snapshot-plugin-id = "jdbc-snapshot-store"
+ }
+}
+
+akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.sqlserver}
+akka.persistence.r2dbc.connection-factory {
+ host = "localhost"
+ port = 1433
+ database = "your_db"
+ user = "your_user"
+ password = "your_password"
+}
+
+akka-persistence-jdbc {
+ shared-databases {
+ default {
+ profile = "slick.jdbc.SQLServerProfile$"
+ db {
+ url = "jdbc:sqlserver://"127.0.0.1":1433;databaseName=master;integratedSecurity=false;"
+ user = "user"
+ password = "password"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+ }
+ }
+}
+
+jdbc-journal {
+ use-shared-db = "default"
+}
+jdbc-snapshot-store {
+ use-shared-db = "default"
+}
+jdbc-read-journal {
+ use-shared-db = "default"
+}
+
+# application specific serializers for events and snapshots
+# must also be configured and included in classpath
diff --git a/migration-tests/src/test/resources/application-sqlserver.conf b/migration-tests/src/test/resources/application-sqlserver.conf
new file mode 100644
index 00000000..524cf3da
--- /dev/null
+++ b/migration-tests/src/test/resources/application-sqlserver.conf
@@ -0,0 +1 @@
+akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.sqlserver}
\ No newline at end of file
diff --git a/migration-tests/src/test/resources/logback-main.xml b/migration-tests/src/test/resources/logback-main.xml
index a3eaf119..4d28c4de 100644
--- a/migration-tests/src/test/resources/logback-main.xml
+++ b/migration-tests/src/test/resources/logback-main.xml
@@ -18,4 +18,4 @@
-
+
\ No newline at end of file
diff --git a/migration-tests/src/test/resources/logback-test.xml b/migration-tests/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..57820d28
--- /dev/null
+++ b/migration-tests/src/test/resources/logback-test.xml
@@ -0,0 +1,24 @@
+
+
+
+
+
+ [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala
index 450b3219..c4cceeff 100644
--- a/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala
+++ b/migration-tests/src/test/scala/akka/persistence/r2dbc/migration/MigrationToolSpec.scala
@@ -21,25 +21,51 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.persistence.r2dbc.TestActors.DurableStatePersister
+import akka.persistence.r2dbc.migration.MigrationToolSpec.dialect
object MigrationToolSpec {
- val config: Config = ConfigFactory
- .parseString("""
+
+ private val testConfig = TestConfig.config
+
+ private val dialect = testConfig.getString("akka.persistence.r2dbc.connection-factory.dialect")
+ private val dbProfile = if (dialect == "sqlserver") {
+ """
+ default {
+ profile = "slick.jdbc.SQLServerProfile$"
+ db {
+ url = "jdbc:sqlserver://"127.0.0.1":1433;databaseName=master;integratedSecurity=false;"
+ user = "SA"
+ password = ""
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ numThreads = 5
+ maxConnections = 5
+ minConnections = 1
+ }
+ }
+ """
+ } else {
+ """
+ default {
+ profile = "slick.jdbc.PostgresProfile$"
+ db {
+ host = "127.0.0.1"
+ url = "jdbc:postgresql://127.0.0.1:5432/postgres?reWriteBatchedInserts=true"
+ user = postgres
+ password = postgres
+ driver = "org.postgresql.Driver"
+ numThreads = 20
+ maxConnections = 20
+ minConnections = 5
+ }
+ }
+ """
+ }
+
+ private val config: Config = ConfigFactory
+ .parseString(s"""
akka-persistence-jdbc {
shared-databases {
- default {
- profile = "slick.jdbc.PostgresProfile$"
- db {
- host = "127.0.0.1"
- url = "jdbc:postgresql://127.0.0.1:5432/postgres?reWriteBatchedInserts=true"
- user = postgres
- password = postgres
- driver = "org.postgresql.Driver"
- numThreads = 20
- maxConnections = 20
- minConnections = 5
- }
- }
+ $dbProfile
}
}
@@ -62,7 +88,8 @@ object MigrationToolSpec {
akka.persistence.r2dbc.state.assert-single-writer = off
""")
- .withFallback(TestConfig.config)
+ .withFallback(testConfig)
+
}
class MigrationToolSpec
@@ -83,10 +110,89 @@ class MigrationToolSpec
private val migration = new MigrationTool(system)
- private val testEnabled: Boolean = {
- // don't run this for Yugabyte since it is using akka-persistence-jdbc
- system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect") == "postgres"
- }
+ // don't run this for Yugabyte since it is using akka-persistence-jdbc
+ private val postgresTest = dialect == "postgres"
+ private val sqlServerTest = dialect == "sqlserver"
+ private val testEnabled = postgresTest || sqlServerTest
+
+ private val createJournalTablePostgres =
+ """CREATE TABLE IF NOT EXISTS jdbc_event_journal(
+ | ordering BIGSERIAL,
+ | persistence_id VARCHAR(255) NOT NULL,
+ | sequence_number BIGINT NOT NULL,
+ | deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ |
+ | writer VARCHAR(255) NOT NULL,
+ | write_timestamp BIGINT,
+ | adapter_manifest VARCHAR(255),
+ |
+ | event_ser_id INTEGER NOT NULL,
+ | event_ser_manifest VARCHAR(255) NOT NULL,
+ | event_payload BYTEA NOT NULL,
+ |
+ | meta_ser_id INTEGER,
+ | meta_ser_manifest VARCHAR(255),
+ | meta_payload BYTEA,
+ |
+ | PRIMARY KEY(persistence_id, sequence_number)
+ |)""".stripMargin
+
+ private val createJournalTableSqlServer =
+ """IF object_id('jdbc_event_journal') is null
+ |CREATE TABLE "jdbc_event_journal" (
+ | "ordering" BIGINT IDENTITY(1,1) NOT NULL,
+ | "deleted" BIT DEFAULT 0 NOT NULL,
+ | "persistence_id" NVARCHAR(255) NOT NULL,
+ | "sequence_number" NUMERIC(10,0) NOT NULL,
+ | "writer" NVARCHAR(255) NOT NULL,
+ | "write_timestamp" BIGINT NOT NULL,
+ | "adapter_manifest" NVARCHAR(MAX) NOT NULL,
+ | "event_payload" VARBINARY(MAX) NOT NULL,
+ | "event_ser_id" INTEGER NOT NULL,
+ | "event_ser_manifest" NVARCHAR(MAX) NOT NULL,
+ | "meta_payload" VARBINARY(MAX),
+ | "meta_ser_id" INTEGER,
+ | "meta_ser_manifest" NVARCHAR(MAX)
+ | PRIMARY KEY ("persistence_id", "sequence_number")
+ |)""".stripMargin
+
+ private val createSnapshotTablePostgres =
+ """CREATE TABLE IF NOT EXISTS jdbc_snapshot (
+ | persistence_id VARCHAR(255) NOT NULL,
+ | sequence_number BIGINT NOT NULL,
+ | created BIGINT NOT NULL,
+ |
+ | snapshot_ser_id INTEGER NOT NULL,
+ | snapshot_ser_manifest VARCHAR(255) NOT NULL,
+ | snapshot_payload BYTEA NOT NULL,
+ |
+ | meta_ser_id INTEGER,
+ | meta_ser_manifest VARCHAR(255),
+ | meta_payload BYTEA,
+ |
+ | PRIMARY KEY(persistence_id, sequence_number)
+ |)""".stripMargin
+
+ private val createSnapshotTableSqlServer =
+ """IF object_id('jdbc_snapshot') is null
+ |CREATE TABLE "jdbc_snapshot" (
+ | "persistence_id" NVARCHAR(255) NOT NULL,
+ | "sequence_number" NUMERIC(10,0) NOT NULL,
+ | "created" BIGINT NOT NULL,
+ | "snapshot_ser_id" INTEGER NOT NULL,
+ | "snapshot_ser_manifest" NVARCHAR(255) NOT NULL,
+ | "snapshot_payload" VARBINARY(MAX) NOT NULL,
+ | "meta_ser_id" INTEGER,
+ | "meta_ser_manifest" NVARCHAR(255),
+ | "meta_payload" VARBINARY(MAX),
+ | PRIMARY KEY ("persistence_id", "sequence_number")
+ | )
+ |""".stripMargin
+
+ private val createJournalTableSql =
+ if (dialect == "sqlserver") createJournalTableSqlServer else createJournalTablePostgres
+ private val createSnapshotTableSql =
+ if (dialect == "sqlserver") createSnapshotTableSqlServer else createSnapshotTablePostgres
override protected def beforeAll(): Unit = {
super.beforeAll()
@@ -94,64 +200,33 @@ class MigrationToolSpec
if (testEnabled) {
Await.result(
r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection =>
- connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_event_journal(
- | ordering BIGSERIAL,
- | persistence_id VARCHAR(255) NOT NULL,
- | sequence_number BIGINT NOT NULL,
- | deleted BOOLEAN DEFAULT FALSE NOT NULL,
- |
- | writer VARCHAR(255) NOT NULL,
- | write_timestamp BIGINT,
- | adapter_manifest VARCHAR(255),
- |
- | event_ser_id INTEGER NOT NULL,
- | event_ser_manifest VARCHAR(255) NOT NULL,
- | event_payload BYTEA NOT NULL,
- |
- | meta_ser_id INTEGER,
- | meta_ser_manifest VARCHAR(255),
- | meta_payload BYTEA,
- |
- | PRIMARY KEY(persistence_id, sequence_number)
- |)""".stripMargin)
+ connection.createStatement(createJournalTableSql)
},
10.seconds)
Await.result(
r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection =>
- connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_snapshot (
- | persistence_id VARCHAR(255) NOT NULL,
- | sequence_number BIGINT NOT NULL,
- | created BIGINT NOT NULL,
- |
- | snapshot_ser_id INTEGER NOT NULL,
- | snapshot_ser_manifest VARCHAR(255) NOT NULL,
- | snapshot_payload BYTEA NOT NULL,
- |
- | meta_ser_id INTEGER,
- | meta_ser_manifest VARCHAR(255),
- | meta_payload BYTEA,
- |
- | PRIMARY KEY(persistence_id, sequence_number)
- |)""".stripMargin)
+ connection.createStatement(createSnapshotTableSql)
},
10.seconds)
- Await.result(
- r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection =>
- connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_durable_state (
- | global_offset BIGSERIAL,
- | persistence_id VARCHAR(255) NOT NULL,
- | revision BIGINT NOT NULL,
- | state_payload BYTEA NOT NULL,
- | state_serial_id INTEGER NOT NULL,
- | state_serial_manifest VARCHAR(255),
- | tag VARCHAR,
- | state_timestamp BIGINT NOT NULL,
- | PRIMARY KEY(persistence_id)
- |);""".stripMargin)
- },
- 10.seconds)
+ if (postgresTest) {
+ Await.result(
+ r2dbcExecutor.executeDdl("beforeAll create jdbc tables") { connection =>
+ connection.createStatement("""CREATE TABLE IF NOT EXISTS jdbc_durable_state (
+ | global_offset BIGSERIAL,
+ | persistence_id VARCHAR(255) NOT NULL,
+ | revision BIGINT NOT NULL,
+ | state_payload BYTEA NOT NULL,
+ | state_serial_id INTEGER NOT NULL,
+ | state_serial_manifest VARCHAR(255),
+ | tag VARCHAR,
+ | state_timestamp BIGINT NOT NULL,
+ | PRIMARY KEY(persistence_id)
+ |);""".stripMargin)
+ },
+ 10.seconds)
+ }
Await.result(
r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_event_journal")),
@@ -159,9 +234,11 @@ class MigrationToolSpec
Await.result(
r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_snapshot")),
10.seconds)
- Await.result(
- r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_durable_state")),
- 10.seconds)
+ if (postgresTest) {
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete jdbc")(_.createStatement("delete from jdbc_durable_state")),
+ 10.seconds)
+ }
Await.result(migration.migrationDao.createProgressTable(), 10.seconds)
Await.result(
@@ -211,10 +288,10 @@ class MigrationToolSpec
probe.expectMessage(Done)
}
- "MigrationTool" should {
+ "MigrationTool Events" should {
if (!testEnabled) {
info(
- s"MigrationToolSpec not enabled for ${system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")}")
+ s"MigrationToolSpec (Events) not enabled for ${system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")}")
pending
}
@@ -321,6 +398,16 @@ class MigrationToolSpec
}
}
+ }
+
+ "MigrationTool State" should {
+
+ if (!postgresTest) {
+ info(
+ s"MigrationToolSpec (State) not enabled for ${system.settings.config.getString("akka.persistence.r2dbc.connection-factory.dialect")}")
+ pending
+ }
+
"migrate durable state of one persistenceId" in {
val pid = PersistenceId.ofUniqueId(nextPid())
persistDurableState(pid, "s-1")
@@ -373,6 +460,6 @@ class MigrationToolSpec
assertDurableState(pid, s"s-$pid")
}
}
-
}
+
}
diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala
index 0ff25b84..fde26282 100644
--- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala
+++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala
@@ -158,7 +158,13 @@ class MigrationTool(system: ActorSystem[_]) {
if (targetR2dbcSettings.dialectName == "h2") {
log.error("Migrating to H2 using the migration tool not currently supported")
}
- private[r2dbc] val migrationDao = new MigrationToolDao(targetExecutorProvider)
+ //private[r2dbc] val migratioDao = new MigrationToolDao(targetExecutorProvider)
+ private[r2dbc] val migrationDao = {
+ targetR2dbcSettings.dialectName match {
+ case "sqlserver" => new SqlServerMigrationToolDao(targetExecutorProvider)
+ case _ => new MigrationToolDao(targetExecutorProvider)
+ }
+ }
private lazy val createProgressTable: Future[Done] =
migrationDao.createProgressTable()
diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala
index 0db12e59..a42230ea 100644
--- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala
+++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala
@@ -11,11 +11,12 @@ import akka.Done
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
+import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
import akka.persistence.r2dbc.internal.codec.IdentityAdapter
import akka.persistence.r2dbc.internal.codec.QueryAdapter
import org.slf4j.LoggerFactory
-
+import io.r2dbc.spi.Statement
import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
/**
@@ -38,36 +39,51 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
ec: ExecutionContext,
system: ActorSystem[_]) {
import MigrationToolDao._
- implicit val queryAdapter: QueryAdapter = IdentityAdapter
+
+ protected val settings: R2dbcSettings = executorProvider.settings
+ import settings.codecSettings.JournalImplicits._
+
// progress always in data partition 0
private val r2dbcExecutor = executorProvider.executorFor(slice = 0)
+ protected def createMigrationProgressTableSql(): String = {
+ sql"""
+ CREATE TABLE IF NOT EXISTS migration_progress(
+ persistence_id VARCHAR(255) NOT NULL,
+ event_seq_nr BIGINT,
+ snapshot_seq_nr BIGINT,
+ state_revision BIGINT,
+ PRIMARY KEY(persistence_id)
+ )"""
+ }
+
def createProgressTable(): Future[Done] = {
r2dbcExecutor.executeDdl("create migration progress table") { connection =>
- connection.createStatement(sql"""
- CREATE TABLE IF NOT EXISTS migration_progress(
- persistence_id VARCHAR(255) NOT NULL,
- event_seq_nr BIGINT,
- snapshot_seq_nr BIGINT,
- state_revision BIGINT,
- PRIMARY KEY(persistence_id)
- )""")
+ connection.createStatement(createMigrationProgressTableSql())
}
}
+ protected def baseUpsertSql(column: String): String = {
+ sql"""
+ INSERT INTO migration_progress
+ (persistence_id, $column)
+ VALUES (?, ?)
+ ON CONFLICT (persistence_id)
+ DO UPDATE SET
+ $column = excluded.$column"""
+ }
+
+ protected def bindBaseUpsertSql(stmt: Statement, persistenceId: String, seqNr: Long): Statement = {
+ stmt
+ .bind(0, persistenceId)
+ .bind(1, seqNr)
+ }
+
def updateEventProgress(persistenceId: String, seqNr: Long): Future[Done] = {
r2dbcExecutor
.updateOne(s"upsert migration progress [$persistenceId]") { connection =>
- connection
- .createStatement(sql"""
- INSERT INTO migration_progress
- (persistence_id, event_seq_nr)
- VALUES (?, ?)
- ON CONFLICT (persistence_id)
- DO UPDATE SET
- event_seq_nr = excluded.event_seq_nr""")
- .bind(0, persistenceId)
- .bind(1, seqNr)
+ val stmt = connection.createStatement(baseUpsertSql("event_seq_nr"))
+ bindBaseUpsertSql(stmt, persistenceId, seqNr)
}
.map(_ => Done)(ExecutionContexts.parasitic)
}
@@ -75,16 +91,8 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
def updateSnapshotProgress(persistenceId: String, seqNr: Long): Future[Done] = {
r2dbcExecutor
.updateOne(s"upsert migration progress [$persistenceId]") { connection =>
- connection
- .createStatement(sql"""
- INSERT INTO migration_progress
- (persistence_id, snapshot_seq_nr)
- VALUES (?, ?)
- ON CONFLICT (persistence_id)
- DO UPDATE SET
- snapshot_seq_nr = excluded.snapshot_seq_nr""")
- .bind(0, persistenceId)
- .bind(1, seqNr)
+ val stmt = connection.createStatement(baseUpsertSql("snapshot_seq_nr"))
+ bindBaseUpsertSql(stmt, persistenceId, seqNr)
}
.map(_ => Done)(ExecutionContexts.parasitic)
}
@@ -92,16 +100,8 @@ import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
def updateDurableStateProgress(persistenceId: String, revision: Long): Future[Done] = {
r2dbcExecutor
.updateOne(s"upsert migration progress [$persistenceId]") { connection =>
- connection
- .createStatement(sql"""
- INSERT INTO migration_progress
- (persistence_id, state_revision)
- VALUES (?, ?)
- ON CONFLICT (persistence_id)
- DO UPDATE SET
- state_revision = excluded.state_revision""")
- .bind(0, persistenceId)
- .bind(1, revision)
+ val stmt = connection.createStatement(baseUpsertSql("state_revision"))
+ bindBaseUpsertSql(stmt, persistenceId, revision)
}
.map(_ => Done)(ExecutionContexts.parasitic)
}
diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala
new file mode 100644
index 00000000..4df347d7
--- /dev/null
+++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/SqlServerMigrationToolDao.scala
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2022 - 2023 Lightbend Inc.
+ */
+
+package akka.persistence.r2dbc.migration
+
+import scala.concurrent.ExecutionContext
+
+import akka.actor.typed.ActorSystem
+import akka.annotation.InternalApi
+import akka.persistence.r2dbc.internal.R2dbcExecutorProvider
+import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter
+import io.r2dbc.spi.Statement
+
+/**
+ * INTERNAL API
+ */
+@InternalApi private[r2dbc] class SqlServerMigrationToolDao(executorProvider: R2dbcExecutorProvider)(implicit
+ ec: ExecutionContext,
+ system: ActorSystem[_])
+ extends MigrationToolDao(executorProvider) {
+
+ import settings.codecSettings.JournalImplicits._
+
+ override protected def createMigrationProgressTableSql(): String = {
+ sql"""
+ IF object_id('migration_progress') is null
+ CREATE TABLE migration_progress(
+ persistence_id NVARCHAR(255) NOT NULL,
+ event_seq_nr BIGINT,
+ snapshot_seq_nr BIGINT,
+ state_revision BIGINT,
+ PRIMARY KEY(persistence_id)
+ )"""
+ }
+
+ override def baseUpsertSql(column: String): String = {
+ sql"""
+ UPDATE migration_progress SET
+ $column = @bindColumn
+ WHERE persistence_id = @persistenceId
+ if @@ROWCOUNT = 0
+ INSERT INTO migration_progress
+ (persistence_id, $column)
+ VALUES(@persistenceId, @bindColumn)
+
+ """
+ }
+
+ // necessary, otherwise we would need to bind both columns multiple times
+ override protected def bindBaseUpsertSql(stmt: Statement, persistenceId: String, columnValue: Long): Statement = {
+ stmt
+ .bind("@persistenceId", persistenceId)
+ .bind("@bindColumn", columnValue)
+ }
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index ff7a0e44..7fcdc2ab 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -15,7 +15,8 @@ object Dependencies {
val AkkaProjectionVersionInDocs = "current"
val H2Version = "2.2.224"
val R2dbcH2Version = "1.0.0.RELEASE"
- val SqlServerVersion = "1.0.2.RELEASE"
+ val SqlServerR2dbcVersion = "1.0.2.RELEASE"
+ val SqlServerJdbcVersion = "7.4.1.jre8"
object Compile {
val akkaActorTyped = "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
@@ -30,7 +31,7 @@ object Dependencies {
val h2 = "com.h2database" % "h2" % H2Version % Provided // EPL 1.0
val r2dbcH2 = "io.r2dbc" % "r2dbc-h2" % R2dbcH2Version % Provided // ApacheV2
- val r2dbcSqlServer = "io.r2dbc" % "r2dbc-mssql" % SqlServerVersion % Provided // ApacheV2
+ val r2dbcSqlServer = "io.r2dbc" % "r2dbc-mssql" % SqlServerR2dbcVersion % Provided // ApacheV2
}
object TestDeps {
@@ -72,6 +73,7 @@ object Dependencies {
val migrationTests =
Seq(
"com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion % Test,
+ "com.microsoft.sqlserver" % "mssql-jdbc" % SqlServerJdbcVersion % Test,
TestDeps.postgresql,
TestDeps.logback,
TestDeps.scalaTest,