From 45a2bc4b807975a013208427bec045dfca0e171c Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 16 Sep 2024 08:52:21 +0200 Subject: [PATCH 1/4] bump: sbt-ci-release 1.6.1 (was 1.6.0) (#597) --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 68956034..33ceee5d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -6,7 +6,7 @@ addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0") addDependencyTreePlugin // for releasing -addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.6.0") +addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.6.1") //// docs addSbtPlugin("com.lightbend.paradox" % "sbt-paradox-dependencies" % "0.2.4") From 96f2fe5599152e20fa1afd458794a5de99a699d7 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:05:27 +0200 Subject: [PATCH 2/4] docs: revise URL structure (libraries) (#601) --- README.md | 4 ++-- build.sbt | 14 +++++++------- core/src/main/resources/reference.conf | 2 +- .../akka/persistence/r2dbc/R2dbcSettings.scala | 4 ++-- .../r2dbc/query/javadsl/R2dbcReadJournal.scala | 4 ++-- .../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 6 +++--- docs/release-train-issue-template.md | 6 +++--- .../r2dbc/migration/MigrationTool.scala | 2 +- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 888e99b6..bfe38097 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,12 @@ The Akka family of projects is managed by teams at [Lightbend](https://lightbend.com/) with help from the community. -[Akka Persistence](https://doc.akka.io/docs/akka/current/scala/persistence.html) journal and snapshot +[Akka Persistence](https://doc.akka.io/libraries/akka-core/current/scala/persistence.html) journal and snapshot store for SQL datbases with RD2BC connectivity. For questions please use the [discuss.akka.io](https://discuss.akka.io). Tag any new questions with `akka-persistence` and `r2dbc`. -The documentation can be found [here](https://doc.akka.io/docs/akka-persistence-r2dbc/current/index.html) +The documentation can be found [here](https://doc.akka.io/libraries/akka-persistence-r2dbc/current/index.html) ## Project status diff --git a/build.sbt b/build.sbt index fa9e6026..8062d27b 100644 --- a/build.sbt +++ b/build.sbt @@ -10,7 +10,7 @@ inThisBuild( Seq( organization := "com.lightbend.akka", organizationName := "Lightbend Inc.", - homepage := Some(url("https://doc.akka.io/docs/akka-persistence-r2dbc/current")), + homepage := Some(url("https://doc.akka.io/libraries/akka-persistence-r2dbc/current")), scmInfo := Some( ScmInfo( url("https://github.com/akka/akka-persistence-r2dbc"), @@ -137,21 +137,21 @@ lazy val docs = project previewPath := (Paradox / siteSubdirName).value, Preprocess / siteSubdirName := s"api/akka-persistence-r2dbc/${projectInfoVersion.value}", Preprocess / sourceDirectory := (LocalRootProject / ScalaUnidoc / unidoc / target).value, - Paradox / siteSubdirName := s"docs/akka-persistence-r2dbc/${projectInfoVersion.value}", + Paradox / siteSubdirName := s"libraries/akka-persistence-r2dbc/${projectInfoVersion.value}", paradoxGroups := Map( "Language" -> Seq("Java", "Scala"), "Dialect" -> Seq("Postgres", "Yugabyte", "H2", "SQLServer")), Compile / paradoxProperties ++= Map( - "project.url" -> "https://doc.akka.io/docs/akka-persistence-r2dbc/current/", - "canonical.base_url" -> "https://doc.akka.io/docs/akka-persistence-r2dbc/current", + "project.url" -> "https://doc.akka.io/libraries/akka-persistence-r2dbc/current/", + "canonical.base_url" -> "https://doc.akka.io/libraries/akka-persistence-r2dbc/current", "akka.version" -> Dependencies.AkkaVersion, "h2.version" -> Dependencies.H2Version, "r2dbc-h2.version" -> Dependencies.R2dbcH2Version, "scala.version" -> scalaVersion.value, "scala.binary.version" -> scalaBinaryVersion.value, - "extref.akka.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s", - "extref.akka-docs.base_url" -> s"https://doc.akka.io/docs/akka/${Dependencies.AkkaVersionInDocs}/%s", - "extref.akka-projection.base_url" -> s"https://doc.akka.io/docs/akka-projection/${Dependencies.AkkaProjectionVersionInDocs}/%s", + "extref.akka.base_url" -> s"https://doc.akka.io/libraries/akka-core/${Dependencies.AkkaVersionInDocs}/%s", + "extref.akka-docs.base_url" -> s"https://doc.akka.io/libraries/akka-core/${Dependencies.AkkaVersionInDocs}/%s", + "extref.akka-projection.base_url" -> s"https://doc.akka.io/libraries/akka-projection/${Dependencies.AkkaProjectionVersionInDocs}/%s", "extref.java-docs.base_url" -> "https://docs.oracle.com/en/java/javase/11/%s", "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/current/", "scaladoc.akka.persistence.r2dbc.base_url" -> s"/${(Preprocess / siteSubdirName).value}/", diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index c9867425..a739df7b 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -164,7 +164,7 @@ akka.persistence.r2dbc { # currentEventsBySlicesStartingFromSnapshots are used. That has a small overhead when storing # snapshots because the timestamp and tags of the corresponding event is retrieved when storing # a snapshot. - # See also https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots + # See also https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots enabled = false } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 6ac03e9c..05d7e746 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -36,7 +36,7 @@ object R2dbcSettings { "Database dialect config has moved from 'akka.persistence.r2dbc.dialect' into the connection-factory block, " + "the old 'dialect' config entry must be removed, " + "see akka-persistence-r2dbc documentation for details on the new configuration scheme: " + - "https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html") + "https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html") } val schema: Option[String] = Option(config.getString("schema")).filterNot(_.trim.isEmpty) @@ -102,7 +102,7 @@ object R2dbcSettings { "The Akka Persistence R2DBC database config scheme has changed, the config needs to be updated " + "to choose database dialect using the connection-factory block, " + "see akka-persistence-r2dbc documentation for details on the new configuration scheme: " + - "https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html") + "https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html") } Vector(ConnectionFactorySettings(config.getConfig("connection-factory"))) } else { diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala index 342e2692..e49c258b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala @@ -117,7 +117,7 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) * * To use `currentEventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide - * https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots + * https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots */ override def currentEventsBySlicesStartingFromSnapshots[Snapshot, Event]( entityType: String, @@ -141,7 +141,7 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) * * To use `eventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide - * https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots + * https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots */ override def eventsBySlicesStartingFromSnapshots[Snapshot, Event]( entityType: String, diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index e47a9779..5dfc49b5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -271,7 +271,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat * * To use `currentEventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide - * https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots + * https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots */ override def currentEventsBySlicesStartingFromSnapshots[Snapshot, Event]( entityType: String, @@ -328,7 +328,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat * * To use `eventsBySlicesStartingFromSnapshots` you must enable configuration * `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in migration guide - * https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots + * https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots */ override def eventsBySlicesStartingFromSnapshots[Snapshot, Event]( entityType: String, @@ -417,7 +417,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat throw new IllegalArgumentException( s"To use $methodName you must enable " + "configuration `akka.persistence.r2dbc.query.start-from-snapshot.enabled` and follow instructions in " + - "migration guide https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots") + "migration guide https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots") private def eventsBySlicesPubSubSource[Event]( entityType: String, diff --git a/docs/release-train-issue-template.md b/docs/release-train-issue-template.md index 438c3e12..ee663b0a 100644 --- a/docs/release-train-issue-template.md +++ b/docs/release-train-issue-template.md @@ -30,7 +30,7 @@ Key links: ### Check availability - [ ] Check [API](https://doc.akka.io/api/akka-persistence-r2dbc/$VERSION$/) documentation -- [ ] Check [reference](https://doc.akka.io/docs/akka-persistence-r2dbc/$VERSION$/) documentation. Check that the reference docs were deployed and show a version warning (see section below on how to fix the version warning). +- [ ] Check [reference](https://doc.akka.io/libraries/akka-persistence-r2dbc/$VERSION$/) documentation. Check that the reference docs were deployed and show a version warning (see section below on how to fix the version warning). - [ ] Check the release on https://repo.akka.io/maven/com/lightbend/akka/akka-persistence-r2dbc_2.13/$VERSION$/akka-persistence-r2dbc_2.13-$VERSION$.pom ### When everything is on https://repo.akka.io/maven @@ -40,7 +40,7 @@ Key links: ``` cd ~/www git status - git add docs/akka-persistence-r2dbc/current docs/akka-persistence-r2dbc/$VERSION$ + git add libraries/akka-persistence-r2dbc/current libraries/akka-persistence-r2dbc/$VERSION$ git add api/akka-persistence-r2dbc/current api/akka-persistence-r2dbc/$VERSION$ git commit -m "Akka Persistence R2DBC $VERSION$" ``` @@ -60,6 +60,6 @@ For minor or major releases: ### Afterwards -- [ ] Update [akka-dependencies bom](https://github.com/lightbend/akka-dependencies) and version for [Akka module versions](https://doc.akka.io/docs/akka-dependencies/current/) in [akka-dependencies repo](https://github.com/akka/akka-dependencies) +- [ ] Update [akka-dependencies bom](https://github.com/lightbend/akka-dependencies) and version for [Akka module versions](https://doc.akka.io/libraries/akka-dependencies/current/) in [akka-dependencies repo](https://github.com/akka/akka-dependencies) - [ ] Update [Akka Projection gRPC samples](https://github.com/akka/akka-projection/tree/main/samples/grpc) - Close this issue diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index 35b5f9d7..69fcaad7 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -327,7 +327,7 @@ class MigrationTool(system: ActorSystem[_]) { seqNr <- { // We could load the timestamp and tags from corresponding event, see R2dbcSnapshotStore.saveAsync, // but when enabling eventsBySlicesStartingFromSnapshots the sql updates should anyway be run. - // See https://doc.akka.io/docs/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots + // See https://doc.akka.io/libraries/akka-persistence-r2dbc/current/migration-guide.html#eventsBySlicesStartingFromSnapshots val serializedRow = serializedSnapotRow(selectedSnapshot) targetSnapshotDao .store(serializedRow) From 591ccff4b11c64226358757b5e9136afe2ec81ca Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Tue, 24 Sep 2024 09:34:55 +0200 Subject: [PATCH 3/4] docs: configure links in ScalaDoc; run link validator (#602) --- .github/workflows/link-validator.yml | 46 ++++++++++++++++++++++++++++ .github/workflows/publish.yml | 2 +- build.sbt | 14 ++++++++- project/Dependencies.scala | 3 ++ project/project-info.conf | 4 --- scripts/link-validator.conf | 36 ++++++++++++++++++++++ 6 files changed, 99 insertions(+), 6 deletions(-) create mode 100644 .github/workflows/link-validator.yml create mode 100644 scripts/link-validator.conf diff --git a/.github/workflows/link-validator.yml b/.github/workflows/link-validator.yml new file mode 100644 index 00000000..6ce185fd --- /dev/null +++ b/.github/workflows/link-validator.yml @@ -0,0 +1,46 @@ +name: Link Validator + +on: + workflow_dispatch: + pull_request: + schedule: + - cron: '0 6 * * 1' + +permissions: + contents: read + +jobs: + validate-links: + runs-on: ubuntu-22.04 + if: github.repository == 'akka/akka-persistence-r2dbc' + steps: + - name: Checkout + # https://github.com/actions/checkout/releases + # v4.1.1 + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 + with: + # See https://github.com/actions/checkout/issues/299#issuecomment-677674415 + ref: ${{ github.event.pull_request.head.sha }} + fetch-depth: 100 + + - name: Fetch tags + run: git fetch --depth=100 origin +refs/tags/*:refs/tags/* + + - name: Cache Coursier cache + # https://github.com/coursier/cache-action/releases + # v6.4.5 + uses: coursier/cache-action@1ff273bff02a8787bc9f1877d347948af647956d + + - name: Set up JDK 17 + # https://github.com/coursier/setup-action/releases + # v1.3.5 + uses: coursier/setup-action@7bde40eee928896f074dbb76d22dd772eed5c65f + with: + jvm: temurin:1.17.0.5 + apps: cs + + - name: sbt site + run: sbt docs/makeSite + + - name: Run Link Validator + run: cs launch net.runne::site-link-validator:0.2.5 -- scripts/link-validator.conf diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 5306b505..91a19d00 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -62,7 +62,7 @@ jobs: # v1.3.5 uses: coursier/setup-action@7bde40eee928896f074dbb76d22dd772eed5c65f with: - jvm: temurin:1.11 + jvm: temurin:1.17 - name: Publish run: |- diff --git a/build.sbt b/build.sbt index 8062d27b..c6af7e73 100644 --- a/build.sbt +++ b/build.sbt @@ -66,6 +66,18 @@ def common: Seq[Setting[_]] = "-Xms1G" :: "-Xmx1G" :: "-XX:MaxDirectMemorySize=256M" :: akkaProperties }, projectInfoVersion := (if (isSnapshot.value) "snapshot" else version.value), + Compile / doc / scalacOptions := scalacOptions.value ++ Seq( + "-doc-title", + "Akka Persistence R2DBC", + "-doc-version", + version.value) ++ { + // make use of https://github.com/scala/scala/pull/8663 + if (scalaBinaryVersion.value.startsWith("3")) { + Seq(s"-external-mappings:https://docs.oracle.com/en/java/javase/${Dependencies.JavaDocLinkVersion}/docs/api") + } else { + Seq("-jdk-api-doc-base", s"https://docs.oracle.com/en/java/javase/${Dependencies.JavaDocLinkVersion}/docs/api") + } + }, Global / excludeLintKeys += projectInfoVersion, Global / excludeLintKeys += mimaReportSignatureProblems, Global / excludeLintKeys += mimaPreviousArtifacts, @@ -152,7 +164,7 @@ lazy val docs = project "extref.akka.base_url" -> s"https://doc.akka.io/libraries/akka-core/${Dependencies.AkkaVersionInDocs}/%s", "extref.akka-docs.base_url" -> s"https://doc.akka.io/libraries/akka-core/${Dependencies.AkkaVersionInDocs}/%s", "extref.akka-projection.base_url" -> s"https://doc.akka.io/libraries/akka-projection/${Dependencies.AkkaProjectionVersionInDocs}/%s", - "extref.java-docs.base_url" -> "https://docs.oracle.com/en/java/javase/11/%s", + "extref.java-docs.base_url" -> s"https://docs.oracle.com/en/java/javase/${Dependencies.JavaDocLinkVersion}/%s", "scaladoc.scala.base_url" -> s"https://www.scala-lang.org/api/current/", "scaladoc.akka.persistence.r2dbc.base_url" -> s"/${(Preprocess / siteSubdirName).value}/", "javadoc.akka.persistence.r2dbc.base_url" -> "", // no Javadoc is published diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 51692ea9..7b3d44c4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,6 +17,9 @@ object Dependencies { val R2dbcH2Version = "1.0.0.RELEASE" val SqlServerR2dbcVersion = "1.0.2.RELEASE" val SqlServerJdbcVersion = "7.4.1.jre8" + // Java Platform version for JavaDoc creation + // sync with Java version in .github/workflows/release.yml#documentation + val JavaDocLinkVersion = 17 object Compile { val akkaActorTyped = "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion diff --git a/project/project-info.conf b/project/project-info.conf index 5017151a..9e7c076e 100644 --- a/project/project-info.conf +++ b/project/project-info.conf @@ -15,10 +15,6 @@ project-info { text: "Lightbend Discuss" url: "https://discuss.lightbend.com/c/akka/" } - { - text: "akka/akka Gitter channel" - url: "https://gitter.im/akka/akka" - } ] } core: ${project-info.shared-info} { diff --git a/scripts/link-validator.conf b/scripts/link-validator.conf new file mode 100644 index 00000000..61d41e07 --- /dev/null +++ b/scripts/link-validator.conf @@ -0,0 +1,36 @@ +// config for https://github.com/ennru/site-link-validator/ +site-link-validator { + root-dir = "./docs/target/site/" + # relative to `root-dir` + start-file = "libraries/akka-persistence-r2dbc/snapshot/index.html" + + # Resolves URLs with the given prefix as local files instead + link-mappings = [ + { + prefix = "https://doc.akka.io/libraries/akka-persistence-r2dbc/snapshot/" + replace = "/libraries/akka-persistence-r2dbc/snapshot/" + } + { + prefix = "https://doc.akka.io/api/akka-persistence-r2dbc/snapshot/" + replace = "/api/akka-persistence-r2dbc/snapshot/" + } + ] + + ignore-missing-local-files-regex = "" + + ignore-files = [] + + ignore-prefixes = [ + # GitHub will block with "429 Too Many Requests" + "https://github.com/akka/akka-persistence-r2dbc/" + # MVN repository forbids access after a few requests + "https://mvnrepository.com/artifact/", + "https://repo.akka.io/", + # created in api/akka-persistence-r2dbc/snapshot/akka/persistence/r2dbc/internal/Sql$.html + "https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/String$.html" + ] + + non-https-whitelist = [ +// "http://logback.qos.ch/" + ] +} From e1163f56076f6422a479b1b22f48098a41f0cb78 Mon Sep 17 00:00:00 2001 From: Sebastian Alfers Date: Mon, 30 Sep 2024 17:20:37 +0200 Subject: [PATCH 4/4] chore: bump to akka 2.10.0-M1 (#603) --- .jvmopts | 2 +- .../r2dbc/ConnectionFactoryProvider.scala | 2 +- .../persistence/r2dbc/R2dbcSettings.scala | 30 +++++++++---------- .../cleanup/javadsl/DurableStateCleanup.scala | 8 ++--- .../cleanup/javadsl/EventSourcedCleanup.scala | 26 ++++++++-------- .../scaladsl/DurableStateCleanup.scala | 15 ++++------ .../scaladsl/EventSourcedCleanup.scala | 5 ++-- .../r2dbc/internal/BySliceQuery.scala | 19 ++++++------ .../r2dbc/internal/ChangeHandlerFactory.scala | 4 +-- .../r2dbc/internal/ContinuousQuery.scala | 5 ++-- .../r2dbc/internal/R2dbcExecutor.scala | 26 ++++++++-------- .../r2dbc/internal/h2/H2Dialect.scala | 2 +- .../r2dbc/internal/h2/H2JournalDao.scala | 9 ++---- .../internal/postgres/PostgresDialect.scala | 10 +++---- .../postgres/PostgresDurableStateDao.scala | 11 +++---- .../postgres/PostgresJournalDao.scala | 20 ++++++------- .../internal/postgres/PostgresQueryDao.scala | 5 ++-- .../postgres/PostgresSnapshotDao.scala | 12 ++++---- .../internal/sqlserver/SqlServerDialect.scala | 4 +-- .../r2dbc/journal/R2dbcJournal.scala | 7 ++--- .../query/javadsl/R2dbcReadJournal.scala | 12 ++++---- .../query/scaladsl/R2dbcReadJournal.scala | 19 ++++++------ .../r2dbc/session/javadsl/R2dbcSession.scala | 18 +++++------ .../javadsl/R2dbcDurableStateStore.scala | 18 +++++------ .../scaladsl/R2dbcDurableStateStore.scala | 3 +- .../akka/persistence/r2dbc/TestActors.scala | 11 ++++--- .../query/EventsBySliceBacktrackingSpec.scala | 3 +- .../r2dbc/query/EventsBySliceSpec.scala | 3 +- .../jdocs/home/cleanup/CleanupDocExample.java | 5 ++-- .../r2dbc/migration/MigrationTool.scala | 25 ++++++---------- .../r2dbc/migration/MigrationToolDao.scala | 6 ++-- native-image-tests/build.sbt | 4 +-- project/Dependencies.scala | 10 +++---- 33 files changed, 163 insertions(+), 196 deletions(-) diff --git a/.jvmopts b/.jvmopts index 60c96d09..61edfa41 100644 --- a/.jvmopts +++ b/.jvmopts @@ -2,4 +2,4 @@ -Xms2G -Xmx2G --Xss2M +-Xss2M \ No newline at end of file diff --git a/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala b/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala index 2465a5ac..839b3d9d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/ConnectionFactoryProvider.scala @@ -17,9 +17,9 @@ import io.r2dbc.spi.ConnectionFactory import java.time.{ Duration => JDuration } import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.duration.Duration +import scala.jdk.CollectionConverters._ import scala.util.Failure import scala.util.Success diff --git a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala index 05d7e746..c52e7f9d 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/R2dbcSettings.scala @@ -4,7 +4,10 @@ package akka.persistence.r2dbc +import scala.concurrent.duration._ import scala.collection.immutable +import scala.jdk.DurationConverters._ + import akka.annotation.InternalApi import akka.annotation.InternalStableApi import akka.persistence.r2dbc.internal.codec.IdentityAdapter @@ -14,12 +17,9 @@ import akka.persistence.r2dbc.internal.codec.QueryAdapter import akka.persistence.r2dbc.internal.codec.TagsCodec import akka.persistence.r2dbc.internal.codec.TimestampCodec import akka.persistence.r2dbc.internal.ConnectionFactorySettings -import akka.util.JavaDurationConverters._ import com.typesafe.config.Config import java.util.Locale -import scala.collection.immutable -import scala.concurrent.duration._ /** * INTERNAL API @@ -61,7 +61,7 @@ object R2dbcSettings { configToMap(config.getConfig("state.custom-table")) val durableStateAdditionalColumnClasses: Map[String, immutable.IndexedSeq[String]] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ val cfg = config.getConfig("state.additional-columns") cfg.root.unwrapped.asScala.toMap.map { case (k, v: java.util.List[_]) => k -> v.iterator.asScala.map(_.toString).toVector @@ -126,7 +126,7 @@ object R2dbcSettings { val logDbCallsExceeding: FiniteDuration = config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match { case "off" => -1.millis - case _ => config.getDuration("log-db-calls-exceeding").asScala + case _ => config.getDuration("log-db-calls-exceeding").toScala } val codecSettings = { @@ -190,7 +190,7 @@ object R2dbcSettings { } private def configToMap(cfg: Config): Map[String, String] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ cfg.root.unwrapped.asScala.toMap.map { case (k, v) => k -> v.toString } } @@ -484,11 +484,11 @@ final class R2dbcSettings private ( */ @InternalStableApi final class QuerySettings(config: Config) { - val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").asScala - val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").asScala + val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala + val behindCurrentTime: FiniteDuration = config.getDuration("behind-current-time").toScala val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled") - val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").asScala - val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").asScala + val backtrackingWindow: FiniteDuration = config.getDuration("backtracking.window").toScala + val backtrackingBehindCurrentTime: FiniteDuration = config.getDuration("backtracking.behind-current-time").toScala val bufferSize: Int = config.getInt("buffer-size") val persistenceIdsBufferSize: Int = config.getInt("persistence-ids.buffer-size") val deduplicateCapacity: Int = config.getInt("deduplicate-capacity") @@ -502,10 +502,10 @@ final class QuerySettings(config: Config) { final class ConnectionPoolSettings(config: Config) { val initialSize: Int = config.getInt("initial-size") val maxSize: Int = config.getInt("max-size") - val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").asScala - val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").asScala + val maxIdleTime: FiniteDuration = config.getDuration("max-idle-time").toScala + val maxLifeTime: FiniteDuration = config.getDuration("max-life-time").toScala - val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").asScala + val acquireTimeout: FiniteDuration = config.getDuration("acquire-timeout").toScala val acquireRetry: Int = config.getInt("acquire-retry") val validationQuery: String = config.getString("validation-query") @@ -513,7 +513,7 @@ final class ConnectionPoolSettings(config: Config) { val closeCallsExceeding: Option[FiniteDuration] = config.getString("close-calls-exceeding").toLowerCase(Locale.ROOT) match { case "off" => None - case _ => Some(config.getDuration("close-calls-exceeding").asScala) + case _ => Some(config.getDuration("close-calls-exceeding").toScala) } } @@ -523,7 +523,7 @@ final class ConnectionPoolSettings(config: Config) { @InternalStableApi final class PublishEventsDynamicSettings(config: Config) { val throughputThreshold: Int = config.getInt("throughput-threshold") - val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").asScala + val throughputCollectInterval: FiniteDuration = config.getDuration("throughput-collect-interval").toScala } /** diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala index 73357eaf..6357e93e 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/DurableStateCleanup.scala @@ -7,8 +7,8 @@ package akka.persistence.r2dbc.cleanup.javadsl import java.util.concurrent.CompletionStage import java.util.{ List => JList } -import scala.collection.JavaConverters._ -import scala.compat.java8.FutureConverters._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import akka.Done import akka.actor.ClassicActorSystemProvider @@ -43,14 +43,14 @@ final class DurableStateCleanup private (delegate: scaladsl.DurableStateCleanup) * Delete the state related to one single `persistenceId`. */ def deleteState(persistenceId: String, resetRevisionNumber: Boolean): CompletionStage[Done] = { - delegate.deleteState(persistenceId, resetRevisionNumber).toJava + delegate.deleteState(persistenceId, resetRevisionNumber).asJava } /** * Delete all states related to the given list of `persistenceIds`. */ def deleteStates(persistenceIds: JList[String], resetRevisionNumber: Boolean): CompletionStage[Done] = { - delegate.deleteStates(persistenceIds.asScala.toVector, resetRevisionNumber).toJava + delegate.deleteStates(persistenceIds.asScala.toVector, resetRevisionNumber).asJava } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala index 3e94e8fd..811a96e5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/javadsl/EventSourcedCleanup.scala @@ -8,8 +8,8 @@ import java.time.Instant import java.util.concurrent.CompletionStage import java.util.{ List => JList } -import scala.collection.JavaConverters._ -import scala.compat.java8.FutureConverters._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import akka.Done import akka.actor.ClassicActorSystemProvider @@ -53,19 +53,19 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) * sequence nr (inclusive) to delete up to */ def deleteEventsTo(persistenceId: String, toSequenceNr: Long): CompletionStage[Done] = - delegate.deleteEventsTo(persistenceId, toSequenceNr).toJava + delegate.deleteEventsTo(persistenceId, toSequenceNr).asJava /** * Delete all events related to one single `persistenceId`. Snapshots are not deleted. */ def deleteAllEvents(persistenceId: String, resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAllEvents(persistenceId, resetSequenceNumber).toJava + delegate.deleteAllEvents(persistenceId, resetSequenceNumber).asJava /** * Delete all events related to the given list of `persistenceIds`. Snapshots are not deleted. */ def deleteAllEvents(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).toJava + delegate.deleteAllEvents(persistenceIds.asScala.toVector, resetSequenceNumber).asJava /** * Delete events before a timestamp for the given persistence id. Snapshots are not deleted. @@ -83,7 +83,7 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) * timestamp (exclusive) to delete up to */ def deleteEventsBefore(persistenceId: String, timestamp: Instant): CompletionStage[Done] = - delegate.deleteEventsBefore(persistenceId, timestamp).toJava + delegate.deleteEventsBefore(persistenceId, timestamp).asJava /** * Delete events before a timestamp for the given entityType and slice. Snapshots are not deleted. @@ -103,43 +103,43 @@ final class EventSourcedCleanup private (delegate: scaladsl.EventSourcedCleanup) * timestamp (exclusive) to delete up to */ def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): CompletionStage[Done] = - delegate.deleteEventsBefore(entityType, slice, timestamp).toJava + delegate.deleteEventsBefore(entityType, slice, timestamp).asJava /** * Delete snapshots related to one single `persistenceId`. Events are not deleted. */ def deleteSnapshot(persistenceId: String): CompletionStage[Done] = - delegate.deleteSnapshot(persistenceId).toJava + delegate.deleteSnapshot(persistenceId).asJava /** * Delete all snapshots related to the given list of `persistenceIds`. Events are not deleted. */ def deleteSnapshots(persistenceIds: JList[String]): CompletionStage[Done] = - delegate.deleteSnapshots(persistenceIds.asScala.toVector).toJava + delegate.deleteSnapshots(persistenceIds.asScala.toVector).asJava /** * Deletes all events for the given persistence id from before the snapshot. The snapshot is not deleted. The event * with the same sequence number as the remaining snapshot is deleted. */ def cleanupBeforeSnapshot(persistenceId: String): CompletionStage[Done] = - delegate.cleanupBeforeSnapshot(persistenceId).toJava + delegate.cleanupBeforeSnapshot(persistenceId).asJava /** * See single persistenceId overload for what is done for each persistence id */ def cleanupBeforeSnapshot(persistenceIds: JList[String]): CompletionStage[Done] = - delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).toJava + delegate.cleanupBeforeSnapshot(persistenceIds.asScala.toVector).asJava /** * Delete everything related to one single `persistenceId`. All events and snapshots are deleted. */ def deleteAll(persistenceId: String, resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAll(persistenceId, resetSequenceNumber).toJava + delegate.deleteAll(persistenceId, resetSequenceNumber).asJava /** * Delete everything related to the given list of `persistenceIds`. All events and snapshots are deleted. */ def deleteAll(persistenceIds: JList[String], resetSequenceNumber: Boolean): CompletionStage[Done] = - delegate.deleteAll(persistenceIds.asScala.toVector, resetSequenceNumber).toJava + delegate.deleteAll(persistenceIds.asScala.toVector, resetSequenceNumber).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala index 5d7b6cf8..cc02554c 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/DurableStateCleanup.scala @@ -4,20 +4,17 @@ package akka.persistence.r2dbc.cleanup.scaladsl -import scala.collection.immutable +import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.collection.immutable import scala.util.Failure import scala.util.Success - import org.slf4j.LoggerFactory - import akka.Done import akka.actor.ClassicActorSystemProvider import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.R2dbcExecutorProvider @@ -73,7 +70,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf if (resetRevisionNumber) stateDao .deleteState(persistenceId, revision = 0L, changeEvent = None) // hard delete without revision check - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) else { stateDao.readState(persistenceId).flatMap { case None => @@ -81,7 +78,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf case Some(s) => stateDao .deleteState(persistenceId, s.revision + 1, changeEvent = None) - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } } } @@ -106,7 +103,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf case pid :: tail => pidOperation(pid).flatMap { _ => if (n % settings.cleanupSettings.logProgressEvery == 0) - log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size) + log.info("Cleanup {} [{}] of [{}].", operationName, n, size) loop(tail, n + 1) } } @@ -116,7 +113,7 @@ final class DurableStateCleanup(systemProvider: ClassicActorSystemProvider, conf result.onComplete { case Success(_) => - log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size) + log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size) case Failure(e) => log.error(s"Cleanup {$operationName} failed.", e) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala index 18660af5..02fb4be6 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/cleanup/scaladsl/EventSourcedCleanup.scala @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory import akka.Done import akka.actor.ClassicActorSystemProvider import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.ApiMayChange import akka.annotation.InternalApi import akka.persistence.SnapshotSelectionCriteria @@ -209,7 +208,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf case pid :: tail => pidOperation(pid).flatMap { _ => if (n % settings.cleanupSettings.logProgressEvery == 0) - log.infoN("Cleanup {} [{}] of [{}].", operationName, n, size) + log.info("Cleanup {} [{}] of [{}].", operationName, n, size) loop(tail, n + 1) } } @@ -219,7 +218,7 @@ final class EventSourcedCleanup(systemProvider: ClassicActorSystemProvider, conf result.onComplete { case Success(_) => - log.info2("Cleanup completed {} of [{}] persistenceId.", operationName, size) + log.info("Cleanup completed {} of [{}] persistenceId.", operationName, size) case Failure(e) => log.error(s"Cleanup {$operationName} failed.", e) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala index 6f9164d1..ff14e3bf 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/BySliceQuery.scala @@ -13,7 +13,6 @@ import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration import akka.NotUsed -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.persistence.query.Offset import akka.persistence.query.TimestampOffset @@ -250,7 +249,7 @@ import org.slf4j.Logger } if (state.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "{} next query [{}] from slices [{} - {}], between time [{} - {}]. Found [{}] rows in previous query.", logPrefix, state.queryCount, @@ -277,7 +276,7 @@ import org.slf4j.Logger .via(deserializeAndAddOffset(state.latest))) } else { if (log.isDebugEnabled) - log.debugN( + log.debug( "{} query [{}] from slices [{} - {}] completed. Found [{}] rows in previous query.", logPrefix, state.queryCount, @@ -297,7 +296,7 @@ import org.slf4j.Logger .futureSource[Envelope, NotUsed] { currentTimestamp.map { currentTime => if (log.isDebugEnabled()) - log.debugN( + log.debug( "{} query slices [{} - {}], from time [{}] until now [{}].", logPrefix, minSlice, @@ -326,7 +325,7 @@ import org.slf4j.Logger val initialOffset = toTimestampOffset(offset) if (log.isDebugEnabled()) - log.debugN( + log.debug( "Starting {} query from slices [{} - {}], from time [{}].", logPrefix, minSlice, @@ -359,7 +358,7 @@ import org.slf4j.Logger if (log.isDebugEnabled()) { if (state.latestBacktracking.seen.nonEmpty && offset.timestamp.isAfter(state.latestBacktracking.timestamp.plus(firstBacktrackingQueryWindow))) - log.debugN( + log.debug( "{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]", logPrefix, state.latestBacktracking, @@ -382,7 +381,7 @@ import org.slf4j.Logger if (log.isDebugEnabled) delay.foreach { d => - log.debugN( + log.debug( "{} query [{}] from slices [{} - {}] delay next [{}] ms.", logPrefix, state.queryCount, @@ -466,7 +465,7 @@ import org.slf4j.Logger " in backtracking mode," else "" - log.debugN( + log.debug( "{} next query [{}]{} from slices [{} - {}], between time [{} - {}]. {}", logPrefix, newState.queryCount, @@ -540,7 +539,7 @@ import org.slf4j.Logger val newState = state.copy(buckets = newBuckets) if (log.isDebugEnabled) { val sum = counts.iterator.map { case Bucket(_, count) => count }.sum - log.debugN( + log.debug( "{} retrieved [{}] event count buckets, with a total of [{}], from slices [{} - {}], from time [{}]", logPrefix, counts.size, @@ -571,7 +570,7 @@ import org.slf4j.Logger throw new IllegalStateException( s"Too many events stored with the same timestamp [$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]") } - log.traceN( + log.trace( "filtering [{}] [{}] as db timestamp is the same as last offset and is in seen [{}]", row.persistenceId, row.seqNr, diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala index d192e52f..9c1467a9 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/ChangeHandlerFactory.scala @@ -4,8 +4,8 @@ package akka.persistence.r2dbc.internal -import scala.compat.java8.FutureConverters._ import scala.concurrent.Future +import scala.jdk.FutureConverters._ import scala.util.Try import akka.Done @@ -29,7 +29,7 @@ import akka.persistence.r2dbc.state.scaladsl.ChangeHandler override def process(session: R2dbcSession, change: DurableStateChange[Any]): Future[Done] = { val javadslSession = new akka.persistence.r2dbc.session.javadsl.R2dbcSession(session.connection)(session.ec, session.system) - delegate.process(javadslSession, change).toScala + delegate.process(javadslSession, change).asScala } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala index 5eb297c3..5d93e034 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/ContinuousQuery.scala @@ -4,15 +4,14 @@ package akka.persistence.r2dbc.internal +import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.NotUsed import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.stream.Attributes import akka.stream.Outlet import akka.stream.SourceShape @@ -117,7 +116,7 @@ final private[r2dbc] class ContinuousQuery[S, T]( beforeQuery(state) match { case None => runNextQuery() case Some(beforeQueryFuture) => - beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContexts.parasitic) + beforeQueryFuture.onComplete(beforeQueryCallback.invoke)(ExecutionContext.parasitic) } } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index cd4391b4..5bad2781 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -9,11 +9,11 @@ import java.util.function.BiConsumer import scala.collection.immutable import scala.collection.immutable.IntMap import scala.collection.mutable -import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ +import scala.jdk.FutureConverters._ import scala.util.Failure import scala.util.Success import scala.util.control.Exception.Catcher @@ -21,9 +21,7 @@ import scala.util.control.NonFatal import akka.Done import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalStableApi -import akka.dispatch.ExecutionContexts import io.r2dbc.spi.Connection import io.r2dbc.spi.ConnectionFactory import io.r2dbc.spi.Result @@ -44,17 +42,17 @@ import akka.persistence.r2dbc.R2dbcSettings @InternalStableApi object R2dbcExecutor { final implicit class PublisherOps[T](val publisher: Publisher[T]) extends AnyVal { def asFuture(): Future[T] = - Mono.from(publisher).toFuture.toScala + Mono.from(publisher).toFuture.asScala def asFutureDone(): Future[Done] = { val mono: Mono[Done] = Mono.from(publisher).map(_ => Done) - mono.defaultIfEmpty(Done).toFuture.toScala + mono.defaultIfEmpty(Done).toFuture.asScala } } def updateOneInTx(stmt: Statement)(implicit ec: ExecutionContext): Future[Long] = stmt.execute().asFuture().flatMap { result => - result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContexts.parasitic) + result.getRowsUpdated.asFuture().map(_.longValue())(ExecutionContext.parasitic) } def updateOneReturningInTx[A](stmt: Statement, mapRow: Row => A)(implicit ec: ExecutionContext): Future[A] = @@ -79,7 +77,7 @@ import akka.persistence.r2dbc.R2dbcSettings statements.foldLeft(Future.successful(Vector.empty[Long])) { (acc, stmt) => acc.flatMap { seq => stmt.execute().asFuture().flatMap { res => - res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContexts.parasitic) + res.getRowsUpdated.asFuture().map(seq :+ _.longValue())(ExecutionContext.parasitic) } } } @@ -142,7 +140,7 @@ class R2dbcExecutor( if (durationMicros >= logDbCallsExceedingMicros) log.info("{} - getConnection took [{}] µs", logPrefix, durationMicros) connection - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } /** @@ -220,7 +218,7 @@ class R2dbcExecutor( def updateInBatchReturning[A](logPrefix: String)( statementFactory: Connection => Statement, mapRow: Row => A): Future[immutable.IndexedSeq[A]] = { - import scala.collection.JavaConverters._ + import scala.jdk.CollectionConverters._ withConnection(logPrefix) { connection => val stmt = statementFactory(connection) Flux @@ -251,7 +249,7 @@ class R2dbcExecutor( } mappedRows.failed.foreach { exc => - log.debug2("{} - Select failed: {}", logPrefix, exc) + log.debug("{} - Select failed: {}", logPrefix, exc) connection.close().asFutureDone() } @@ -259,7 +257,7 @@ class R2dbcExecutor( connection.close().asFutureDone().map { _ => val durationMicros = durationInMicros(startTime) if (durationMicros >= logDbCallsExceedingMicros) - log.infoN("{} - Selected [{}] rows in [{}] µs", logPrefix, r.size, durationMicros) + log.info("{} - Selected [{}] rows in [{}] µs", logPrefix, r.size, durationMicros) r } } @@ -290,7 +288,7 @@ class R2dbcExecutor( result.failed.foreach { exc => if (log.isDebugEnabled()) - log.debug2("{} - DB call failed: {}", logPrefix, exc.toString) + log.debug("{} - DB call failed: {}", logPrefix, exc.toString) // ok to rollback async like this, or should it be before completing the returned Future? val done = rollbackAndClose(connection) timeoutTask.foreach { task => done.onComplete(_ => task.cancel()) } @@ -332,7 +330,7 @@ class R2dbcExecutor( } result.failed.foreach { exc => - log.debug2("{} - DB call failed: {}", logPrefix, exc) + log.debug("{} - DB call failed: {}", logPrefix, exc) // auto-commit so nothing to rollback val done = connection.close().asFutureDone() timeoutTask.foreach { task => done.onComplete(_ => task.cancel()) } @@ -342,7 +340,7 @@ class R2dbcExecutor( val done = connection.close().asFutureDone().map { _ => val durationMicros = durationInMicros(startTime) if (durationMicros >= logDbCallsExceedingMicros) - log.infoN("{} - DB call completed [{}] in [{}] µs", logPrefix, r, durationMicros) + log.info("{} - DB call completed [{}] in [{}] µs", logPrefix, r, durationMicros) r } timeoutTask.foreach { task => done.onComplete(_ => task.cancel()) } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala index c93dffa9..f89e5e05 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2Dialect.scala @@ -14,7 +14,6 @@ import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao import akka.persistence.r2dbc.internal.Sql.InterpolationWithAdapter -import akka.util.ccompat.JavaConverters._ import com.typesafe.config.Config import io.r2dbc.h2.H2ConnectionConfiguration import io.r2dbc.h2.H2ConnectionFactory @@ -23,6 +22,7 @@ import io.r2dbc.spi.ConnectionFactory import java.util.Locale import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ import akka.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsProvider import akka.persistence.r2dbc.internal.R2dbcExecutorProvider diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala index 6a548399..db12b37b 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/h2/H2JournalDao.scala @@ -5,16 +5,13 @@ package akka.persistence.r2dbc.internal.h2 import java.time.Instant - +import scala.concurrent.ExecutionContext import scala.concurrent.Future - import io.r2dbc.spi.Connection import io.r2dbc.spi.Statement import org.slf4j.Logger import org.slf4j.LoggerFactory - import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.r2dbc.internal.R2dbcExecutorProvider @@ -81,7 +78,7 @@ private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider) result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", 1, events.head.persistenceId) } - result.map(_ => events.head.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => events.head.dbTimestamp)(ExecutionContext.parasitic) } override def writeEventInTx(event: SerializedJournalRow, connection: Connection): Future[Instant] = { @@ -95,7 +92,7 @@ private[r2dbc] class H2JournalDao(executorProvider: R2dbcExecutorProvider) result.foreach { _ => log.debug("Wrote [{}] event for persistenceId [{}]", 1, persistenceId) } - result.map(_ => event.dbTimestamp)(ExecutionContexts.parasitic) + result.map(_ => event.dbTimestamp)(ExecutionContext.parasitic) } private def bindInsertStatement(stmt: Statement, write: SerializedJournalRow): Statement = { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala index 7c1c7aab..16087f26 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDialect.scala @@ -9,6 +9,7 @@ import java.util.Locale import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -18,7 +19,6 @@ import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao -import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider import io.r2dbc.postgresql.client.SSLMode @@ -57,13 +57,13 @@ private[r2dbc] object PostgresDialect extends Dialect { val sslKey: String = config.getString("ssl.key") val sslPassword: String = config.getString("ssl.password") - val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").asScala + val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").toScala val statementCacheSize: Int = config.getInt("statement-cache-size") val statementTimeout: Option[FiniteDuration] = config.getString("statement-timeout").toLowerCase(Locale.ROOT) match { case "off" => None - case _ => Some(config.getDuration("statement-timeout").asScala) + case _ => Some(config.getDuration("statement-timeout").toScala) } } @@ -95,8 +95,8 @@ private[r2dbc] object PostgresDialect extends Dialect { Integer.valueOf(settings.statementCacheSize)) settings.statementTimeout.foreach { timeout => - import akka.util.JavaDurationConverters._ - builder.option(PostgresqlConnectionFactoryProvider.STATEMENT_TIMEOUT, timeout.asJava) + import scala.jdk.DurationConverters._ + builder.option(PostgresqlConnectionFactoryProvider.STATEMENT_TIMEOUT, timeout.toJava) } if (settings.sslEnabled) { diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala index 74faf857..1fcd12f5 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresDurableStateDao.scala @@ -25,9 +25,7 @@ import org.slf4j.LoggerFactory import akka.Done import akka.NotUsed import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.query.DeletedDurableState import akka.persistence.query.DurableStateChange @@ -475,7 +473,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv changeEvent: Option[SerializedJournalRow]): Future[Option[Instant]] = { if (revision == 0) { hardDeleteState(persistenceId) - .map(_ => None)(ExecutionContexts.parasitic) + .map(_ => None)(ExecutionContext.parasitic) } else { val slice = persistenceExt.sliceForPersistenceId(persistenceId) val executor = executorProvider.executorFor(slice) @@ -612,7 +610,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv if (log.isDebugEnabled()) result.foreach(_ => log.debug("Hard deleted durable state for persistenceId [{}]", persistenceId)) - result.map(_ => Done)(ExecutionContexts.parasitic) + result.map(_ => Done)(ExecutionContext.parasitic) } override def currentDbTimestamp(slice: Int): Future[Instant] = { @@ -708,8 +706,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv )) if (log.isDebugEnabled) - result.foreach(rows => - log.debugN("Read [{}] durable states from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] durable states from slices [{} - {}]", rows.size, minSlice, maxSlice)) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -890,7 +887,7 @@ private[r2dbc] class PostgresDurableStateDao(executorProvider: R2dbcExecutorProv }) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) result diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala index 56ab776b..7f6990a7 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresJournalDao.scala @@ -16,9 +16,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings import akka.persistence.r2dbc.internal.JournalDao @@ -202,7 +200,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) result.foreach { _ => log.debug("Wrote [{}] events for persistenceId [{}]", totalEvents, persistenceId) } - result.map(_.head)(ExecutionContexts.parasitic) + result.map(_.head)(ExecutionContext.parasitic) } } @@ -295,7 +293,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) val seqNr = row.get(0, classOf[java.lang.Long]) if (seqNr eq null) 0L else seqNr.longValue }) - .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic) + .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic) if (log.isDebugEnabled) result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr)) @@ -316,7 +314,7 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) val seqNr = row.get(0, classOf[java.lang.Long]) if (seqNr eq null) 0L else seqNr.longValue }) - .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic) + .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContext.parasitic) if (log.isDebugEnabled) result.foreach(seqNr => log.debug("Lowest sequence nr for persistenceId [{}]: [{}]", persistenceId, seqNr)) @@ -378,13 +376,13 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) } }).map(deletedRows => if (log.isDebugEnabled) { - log.debugN( + log.debug( "Deleted [{}] events for persistenceId [{}], from seq num [{}] to [{}]", deletedRows, persistenceId, from, to) - })(ExecutionContexts.parasitic) + })(ExecutionContext.parasitic) } val batchSize = settings.cleanupSettings.eventsJournalDeleteBatchSize @@ -416,8 +414,8 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) .bindTimestamp(1, timestamp) } .map(deletedRows => - log.debugN("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))( - ExecutionContexts.parasitic) + log.debug("Deleted [{}] events for persistenceId [{}], before [{}]", deletedRows, persistenceId, timestamp))( + ExecutionContext.parasitic) } override def deleteEventsBefore(entityType: String, slice: Int, timestamp: Instant): Future[Unit] = { @@ -431,12 +429,12 @@ private[r2dbc] class PostgresJournalDao(executorProvider: R2dbcExecutorProvider) .bindTimestamp(2, timestamp) } .map(deletedRows => - log.debugN( + log.debug( "Deleted [{}] events for entityType [{}], slice [{}], before [{}]", deletedRows, entityType, slice, - timestamp))(ExecutionContexts.parasitic) + timestamp))(ExecutionContext.parasitic) } } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala index 84b97c03..c6f73e49 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresQueryDao.scala @@ -18,7 +18,6 @@ import org.slf4j.LoggerFactory import akka.NotUsed import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi import akka.persistence.Persistence import akka.persistence.r2dbc.R2dbcSettings @@ -299,7 +298,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e metadata = readMetadata(row))) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] events from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] events from slices [{} - {}]", rows.size, minSlice, maxSlice)) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -348,7 +347,7 @@ private[r2dbc] class PostgresQueryDao(executorProvider: R2dbcExecutorProvider) e }) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) result } diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala index f43386d2..1c7703ef 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/postgres/PostgresSnapshotDao.scala @@ -17,9 +17,7 @@ import org.slf4j.LoggerFactory import akka.NotUsed import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.persistence.Persistence import akka.persistence.SnapshotSelectionCriteria import akka.persistence.r2dbc.R2dbcSettings @@ -273,7 +271,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider statement }, collectSerializedSnapshot(entityType, _)) - .map(_.headOption)(ExecutionContexts.parasitic) + .map(_.headOption)(ExecutionContext.parasitic) } protected def bindUpsertSql(statement: Statement, serializedRow: SerializedSnapshotRow): Statement = { @@ -322,7 +320,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider bindUpsertSql(statement, serializedRow) } - .map(_ => ())(ExecutionContexts.parasitic) + .map(_ => ())(ExecutionContext.parasitic) } def delete(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = { @@ -352,7 +350,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider } statement } - }.map(_ => ())(ExecutionContexts.parasitic) + }.map(_ => ())(ExecutionContext.parasitic) /** * This is used from `BySliceQuery`, i.e. only if settings.querySettings.startFromSnapshotEnabled @@ -406,7 +404,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider collectSerializedSnapshot(entityType, _)) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] snapshots from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] snapshots from slices [{} - {}]", rows.size, minSlice, maxSlice)) Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => NotUsed) } @@ -465,7 +463,7 @@ private[r2dbc] class PostgresSnapshotDao(executorProvider: R2dbcExecutorProvider }) if (log.isDebugEnabled) - result.foreach(rows => log.debugN("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) + result.foreach(rows => log.debug("Read [{}] bucket counts from slices [{} - {}]", rows.size, minSlice, maxSlice)) result diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala index ee6d5125..17723302 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/sqlserver/SqlServerDialect.scala @@ -8,6 +8,7 @@ import java.time.{ Duration => JDuration } import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration +import scala.jdk.DurationConverters._ import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -17,7 +18,6 @@ import akka.persistence.r2dbc.internal.DurableStateDao import akka.persistence.r2dbc.internal.JournalDao import akka.persistence.r2dbc.internal.QueryDao import akka.persistence.r2dbc.internal.SnapshotDao -import akka.util.JavaDurationConverters.JavaDurationOps import com.typesafe.config.Config import io.r2dbc.mssql.MssqlConnectionFactoryProvider import io.r2dbc.spi.ConnectionFactories @@ -44,7 +44,7 @@ private[r2dbc] object SqlServerDialect extends Dialect { val user: String = config.getString("user") val password: String = config.getString("password") val database: String = config.getString("database") - val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").asScala + val connectTimeout: FiniteDuration = config.getDuration("connect-timeout").toScala } diff --git a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala index 68ef5d79..dbd07a27 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/journal/R2dbcJournal.scala @@ -18,7 +18,6 @@ import akka.actor.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.persistence.AtomicWrite import akka.persistence.Persistence @@ -196,7 +195,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends writeAndPublishResult.onComplete { _ => self ! WriteFinished(persistenceId, writeAndPublishResult) } - writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic) + writeAndPublishResult.map(_ => Nil)(ExecutionContext.parasitic) } private def publish(messages: immutable.Seq[AtomicWrite], dbTimestamp: Future[Instant]): Future[Done] = @@ -211,7 +210,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends } case None => - dbTimestamp.map(_ => Done)(ExecutionContexts.parasitic) + dbTimestamp.map(_ => Done)(ExecutionContext.parasitic) } override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { @@ -226,7 +225,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, cfgPath: String) extends case Some(f) => log.debug("Write in progress for [{}], deferring replayMessages until write completed", persistenceId) // we only want to make write - replay sequential, not fail if previous write failed - f.recover { case _ => Done }(ExecutionContexts.parasitic) + f.recover { case _ => Done }(ExecutionContext.parasitic) case None => FutureDone } pendingWrite.flatMap { _ => diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala index e49c258b..6093ddda 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/javadsl/R2dbcReadJournal.scala @@ -9,11 +9,11 @@ import java.util import java.util.Optional import java.util.concurrent.CompletionStage -import scala.compat.java8.OptionConverters._ -import scala.compat.java8.FutureConverters._ +import scala.concurrent.ExecutionContext +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import akka.NotUsed -import akka.dispatch.ExecutionContexts import akka.japi.Pair import akka.persistence.query.{ EventEnvelope => ClassicEventEnvelope } import akka.persistence.query.Offset @@ -152,7 +152,7 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) delegate.eventsBySlicesStartingFromSnapshots(entityType, minSlice, maxSlice, offset, transformSnapshot(_)).asJava override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer, Integer]] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ delegate .sliceRanges(numberOfRanges) .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) @@ -232,9 +232,9 @@ final class R2dbcReadJournal(delegate: scaladsl.R2dbcReadJournal) delegate.currentPersistenceIds(entityType, afterId, limit).asJava override def timestampOf(persistenceId: String, sequenceNr: Long): CompletionStage[Optional[Instant]] = - delegate.timestampOf(persistenceId, sequenceNr).map(_.asJava)(ExecutionContexts.parasitic).toJava + delegate.timestampOf(persistenceId, sequenceNr).map(_.toJava)(ExecutionContext.parasitic).asJava override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): CompletionStage[EventEnvelope[Event]] = - delegate.loadEnvelope[Event](persistenceId, sequenceNr).toJava + delegate.loadEnvelope[Event](persistenceId, sequenceNr).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala index 5dfc49b5..a6758ce1 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala @@ -15,7 +15,6 @@ import scala.concurrent.duration.FiniteDuration import akka.NotUsed import akka.actor.ExtendedActorSystem import akka.actor.typed.pubsub.Topic -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.annotation.ApiMayChange import akka.annotation.InternalApi @@ -525,7 +524,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat latestBacktracking = t.timestamp env :: Nil } else if (EnvelopeOrigin.fromPubSub(env) && latestBacktracking == Instant.EPOCH) { - log.trace2( + log.trace( "Dropping pubsub event for persistenceId [{}] seqNr [{}] because no event from backtracking yet.", env.persistenceId, env.sequenceNr) @@ -534,7 +533,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat .between(latestBacktracking, t.timestamp) .compareTo(maxAheadOfBacktracking) > 0) { // drop from pubsub when too far ahead from backtracking - log.debug2( + log.debug( "Dropping pubsub event for persistenceId [{}] seqNr [{}] because too far ahead of backtracking.", env.persistenceId, env.sequenceNr) @@ -584,7 +583,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) if (state.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "currentEventsByPersistenceId query [{}] for persistenceId [{}], from [{}] to [{}]. Found [{}] rows in previous query.", state.queryCount, persistenceId, @@ -596,7 +595,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat queryDao .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, highestSeqNr, includeDeleted)) } else { - log.debugN( + log.debug( "currentEventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.", state.queryCount, persistenceId, @@ -607,7 +606,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } if (log.isDebugEnabled()) - log.debugN( + log.debug( "currentEventsByPersistenceId query for persistenceId [{}], from [{}] to [{}].", persistenceId, fromSequenceNr, @@ -680,7 +679,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat settings.querySettings.refreshInterval) delay.foreach { d => - log.debugN( + log.debug( "eventsByPersistenceId query [{}] for persistenceId [{}] delay next [{}] ms.", state.queryCount, persistenceId, @@ -693,7 +692,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat def nextQuery( state: ByPersistenceIdState): (ByPersistenceIdState, Option[Source[SerializedJournalRow, NotUsed]]) = { if (state.latestSeqNr >= toSequenceNr) { - log.debugN( + log.debug( "eventsByPersistenceId query [{}] for persistenceId [{}] completed. Found [{}] rows in previous query.", state.queryCount, persistenceId, @@ -702,7 +701,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat } else { val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) - log.debugN( + log.debug( "eventsByPersistenceId query [{}] for persistenceId [{}], from [{}]. Found [{}] rows in previous query.", newState.queryCount, persistenceId, @@ -809,7 +808,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, config: Config, cfgPat val newState = state.copy(rowCount = 0, queryCount = state.queryCount + 1) if (state.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "persistenceIds query [{}] after [{}]. Found [{}] rows in previous query.", state.queryCount, state.latestPid, diff --git a/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala b/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala index e86cddc9..f8ae2290 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/session/javadsl/R2dbcSession.scala @@ -8,10 +8,10 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.{ Function => JFunction } -import scala.collection.JavaConverters._ -import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ import scala.concurrent.ExecutionContext +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ +import scala.jdk.OptionConverters._ import akka.actor.typed.ActorSystem import akka.annotation.ApiMayChange @@ -39,9 +39,9 @@ object R2dbcSession { fun: JFunction[R2dbcSession, CompletionStage[A]]): CompletionStage[A] = { scaladsl.R2dbcSession.withSession(system, connectionFactoryConfigPath) { scaladslSession => val javadslSession = new R2dbcSession(scaladslSession.connection)(system.executionContext, system) - fun(javadslSession).toScala + fun(javadslSession).asScala } - }.toJava + }.asJava } @@ -52,18 +52,18 @@ final class R2dbcSession(val connection: Connection)(implicit ec: ExecutionConte connection.createStatement(sql) def updateOne(statement: Statement): CompletionStage[java.lang.Long] = - R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContexts.parasitic).toJava + R2dbcExecutor.updateOneInTx(statement).map(java.lang.Long.valueOf)(ExecutionContext.parasitic).asJava def update(statements: java.util.List[Statement]): CompletionStage[java.util.List[java.lang.Long]] = R2dbcExecutor .updateInTx(statements.asScala.toVector) .map(results => results.map(java.lang.Long.valueOf).asJava) - .toJava + .asJava def selectOne[A](statement: Statement)(mapRow: JFunction[Row, A]): CompletionStage[Optional[A]] = - R2dbcExecutor.selectOneInTx(statement, mapRow(_)).map(_.asJava)(ExecutionContexts.parasitic).toJava + R2dbcExecutor.selectOneInTx(statement, mapRow(_)).map(_.toJava)(ExecutionContext.parasitic).asJava def select[A](statement: Statement)(mapRow: JFunction[Row, A]): CompletionStage[java.util.List[A]] = - R2dbcExecutor.selectInTx(statement, mapRow(_)).map(_.asJava).toJava + R2dbcExecutor.selectInTx(statement, mapRow(_)).map(_.asJava).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala index 40b3f20d..29d938fe 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/javadsl/R2dbcDurableStateStore.scala @@ -9,6 +9,7 @@ import java.util.Optional import java.util.concurrent.CompletionStage import scala.concurrent.ExecutionContext +import scala.jdk.FutureConverters.FutureOps import akka.Done import akka.NotUsed @@ -20,7 +21,6 @@ import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery import akka.persistence.r2dbc.state.scaladsl.{ R2dbcDurableStateStore => ScalaR2dbcDurableStateStore } import akka.persistence.state.javadsl.GetObjectResult import akka.stream.javadsl.Source -import scala.compat.java8.FutureConverters.FutureOps import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore @@ -37,10 +37,10 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl scalaStore .getObject(persistenceId) .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)) - .toJava + .asJava override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = - scalaStore.upsertObject(persistenceId, revision, value, tag).toJava + scalaStore.upsertObject(persistenceId, revision, value, tag).asJava override def upsertObject( persistenceId: String, @@ -48,17 +48,17 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl value: A, tag: String, changeEvent: Any): CompletionStage[Done] = - scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).toJava + scalaStore.upsertObject(persistenceId, revision, value, tag, changeEvent).asJava @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): CompletionStage[Done] = deleteObject(persistenceId, revision = 0) override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = - scalaStore.deleteObject(persistenceId, revision).toJava + scalaStore.deleteObject(persistenceId, revision).asJava override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] = - scalaStore.deleteObject(persistenceId, revision, changeEvent).toJava + scalaStore.deleteObject(persistenceId, revision, changeEvent).asJava override def currentChangesBySlices( entityType: String, @@ -78,7 +78,7 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl scalaStore.sliceForPersistenceId(persistenceId) override def sliceRanges(numberOfRanges: Int): util.List[Pair[Integer, Integer]] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ scalaStore .sliceRanges(numberOfRanges) .map(range => Pair(Integer.valueOf(range.min), Integer.valueOf(range.max))) @@ -103,12 +103,12 @@ class R2dbcDurableStateStore[A](scalaStore: ScalaR2dbcDurableStateStore[A])(impl * A source containing all the persistence ids, limited as specified. */ def currentPersistenceIds(entityType: String, afterId: Optional[String], limit: Long): Source[String, NotUsed] = { - import scala.compat.java8.OptionConverters._ + import scala.jdk.OptionConverters._ scalaStore.currentPersistenceIds(entityType, afterId.asScala, limit).asJava } override def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed] = { - import scala.compat.java8.OptionConverters._ + import scala.jdk.OptionConverters._ scalaStore.currentPersistenceIds(afterId.asScala, limit).asJava } diff --git a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala index 110599c8..995bfa61 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala @@ -14,7 +14,6 @@ import scala.concurrent.Future import akka.Done import akka.NotUsed import akka.actor.ExtendedActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.adapter._ import akka.persistence.Persistence import akka.persistence.SerializedEvent @@ -353,7 +352,7 @@ class R2dbcDurableStateStore[A](system: ExtendedActorSystem, config: Config, cfg val newState2 = newState.copy(rowCount = 0, queryCount = newState.queryCount + 1) if (newState.queryCount != 0 && log.isDebugEnabled()) - log.debugN( + log.debug( "persistenceIds query [{}] after [{}]. Found [{}] rows in previous query.", newState.queryCount, newState.latestPid, diff --git a/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala b/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala index 57afa503..65ea53d3 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/TestActors.scala @@ -7,7 +7,6 @@ package akka.persistence.r2dbc import akka.Done import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.{ ActorRef, Behavior } import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId @@ -92,14 +91,14 @@ object TestActors { { (state, command) => command match { case command: Persist => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, EventSourcedBehavior.lastSequenceNumber(context) + 1) Effect.persist(command.payload) case command: PersistWithAck => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, @@ -107,7 +106,7 @@ object TestActors { Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done) case command: PersistAll => if (context.log.isDebugEnabled) - context.log.debugN( + context.log.debug( "PersistAll [{}], pid [{}], seqNr [{}]", command.payloads.mkString(","), pid.id, @@ -157,14 +156,14 @@ object TestActors { { (state, command) => command match { case command: Persist => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, DurableStateBehavior.lastSequenceNumber(context) + 1) Effect.persist(command.payload) case command: PersistWithAck => - context.log.debugN( + context.log.debug( "Persist [{}], pid [{}], seqNr [{}]", command.payload, pid.id, diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala index 6e8dd420..16f26b6d 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala @@ -16,7 +16,6 @@ import org.slf4j.LoggerFactory import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorSystem -import akka.actor.typed.scaladsl.LoggerOps import akka.persistence.query.NoOffset import akka.persistence.query.Offset import akka.persistence.query.PersistenceQuery @@ -69,7 +68,7 @@ class EventsBySliceBacktrackingSpec // to be able to store events with specific timestamps private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { - log.debugN("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) + log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) val insertEventSql = sql""" INSERT INTO ${settings.journalTableWithSchema(slice)} (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) diff --git a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala index dd75ada3..c87aebb6 100644 --- a/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala +++ b/core/src/test/scala/akka/persistence/r2dbc/query/EventsBySliceSpec.scala @@ -12,7 +12,6 @@ import akka.Done import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.{ ActorRef, ActorSystem } import akka.persistence.FilteredPayload import akka.persistence.query.NoOffset @@ -98,7 +97,7 @@ class EventsBySliceSpec // to be able to store events with specific timestamps private def writeEvent(slice: Int, persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { - log.debugN("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) + log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp) val insertEventSql = sql""" INSERT INTO ${settings.journalTableWithSchema(slice)} diff --git a/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java index 542f360f..8ee8e7cf 100644 --- a/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java +++ b/docs/src/test/java/jdocs/home/cleanup/CleanupDocExample.java @@ -9,7 +9,8 @@ import akka.persistence.query.javadsl.CurrentPersistenceIdsQuery; import akka.persistence.r2dbc.cleanup.scaladsl.EventSourcedCleanup; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; -import scala.compat.java8.FutureConverters; + +import scala.jdk.javaapi.FutureConverters; public class CleanupDocExample { @@ -30,7 +31,7 @@ public static void example() { queries .currentPersistenceIds() .mapAsync(persistenceIdParallelism, pid -> - FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid))) + FutureConverters.asJava(cleanup.cleanupBeforeSnapshot(pid))) .run(system); //#cleanup diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala index 69fcaad7..56028870 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationTool.scala @@ -5,19 +5,16 @@ package akka.persistence.r2dbc.migration import java.time.Instant - +import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.LoggerOps -import akka.dispatch.ExecutionContexts import akka.pattern.ask import akka.persistence.Persistence import akka.persistence.SelectedSnapshot @@ -45,7 +42,6 @@ import akka.stream.scaladsl.Sink import akka.util.Timeout import io.r2dbc.spi.R2dbcDataIntegrityViolationException import org.slf4j.LoggerFactory - import akka.persistence.r2dbc.internal.DurableStateDao.SerializedStateRow import akka.persistence.r2dbc.internal.R2dbcExecutorProvider import akka.persistence.state.DurableStateStoreRegistry @@ -198,7 +194,7 @@ class MigrationTool(system: ActorSystem[_]) { } yield persistenceId -> Result(1, eventCount, snapshotCount) } .map { case (pid, result @ Result(_, events, snapshots)) => - log.debugN( + log.debug( "Migrated persistenceId [{}] with [{}] events{}.", pid, events, @@ -208,7 +204,7 @@ class MigrationTool(system: ActorSystem[_]) { .runWith(Sink.fold(Result.empty) { case (acc, Result(_, events, snapshots)) => val result = Result(acc.persistenceIds + 1, acc.events + events, acc.snapshots + snapshots) if (result.persistenceIds % 100 == 0) - log.infoN( + log.info( "Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.", result.persistenceIds, result.events, @@ -218,7 +214,7 @@ class MigrationTool(system: ActorSystem[_]) { result.transform { case s @ Success(Result(persistenceIds, events, snapshots)) => - log.infoN( + log.info( "Migration successful. Migrated [{}] persistenceIds with [{}] events and [{}] snapshots.", persistenceIds, events, @@ -331,7 +327,7 @@ class MigrationTool(system: ActorSystem[_]) { val serializedRow = serializedSnapotRow(selectedSnapshot) targetSnapshotDao .store(serializedRow) - .map(_ => snapshotMetadata.sequenceNr)(ExecutionContexts.parasitic) + .map(_ => snapshotMetadata.sequenceNr)(ExecutionContext.parasitic) } _ <- migrationDao.updateSnapshotProgress(persistenceId, seqNr) } yield 1 @@ -399,22 +395,19 @@ class MigrationTool(system: ActorSystem[_]) { } yield persistenceId -> DurableStateResult(1, stateCount) } .map { case (pid, result @ DurableStateResult(_, states)) => - log.debugN("Migrated persistenceId [{}] with [{}] durable state.", pid, states) + log.debug("Migrated persistenceId [{}] with [{}] durable state.", pid, states) result } .runWith(Sink.fold(DurableStateResult.empty) { case (acc, DurableStateResult(_, states)) => val result = DurableStateResult(acc.persistenceIds + 1, acc.states + states) if (result.persistenceIds % 100 == 0) - log.infoN("Migrated [{}] persistenceIds with [{}] durable states.", result.persistenceIds, result.states) + log.info("Migrated [{}] persistenceIds with [{}] durable states.", result.persistenceIds, result.states) result }) result.transform { case s @ Success(DurableStateResult(persistenceIds, states)) => - log.infoN( - "Migration successful. Migrated [{}] persistenceIds with [{}] durable states.", - persistenceIds, - states) + log.info("Migration successful. Migrated [{}] persistenceIds with [{}] durable states.", persistenceIds, states) s case f @ Failure(exc) => log.error("Migration failed.", exc) @@ -443,7 +436,7 @@ class MigrationTool(system: ActorSystem[_]) { val serializedRow = serializedDurableStateRow(selectedDurableState) durableStateMigrationToolDao .upsertState(serializedRow, selectedDurableState.value, None) - .map(_ => selectedDurableState.revision)(ExecutionContexts.parasitic) + .map(_ => selectedDurableState.revision)(ExecutionContext.parasitic) } _ <- migrationDao.updateDurableStateProgress(persistenceId, revision) } yield 1 diff --git a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala index 87bfd2b9..ebe37008 100644 --- a/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala +++ b/migration/src/main/scala/akka/persistence/r2dbc/migration/MigrationToolDao.scala @@ -95,7 +95,7 @@ import akka.persistence.typed.PersistenceId val stmt = connection.createStatement(baseUpsertMigrationProgressSql("event_seq_nr")) bindBaseUpsertSql(stmt, persistenceId, seqNr) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def updateSnapshotProgress(persistenceId: String, seqNr: Long): Future[Done] = { @@ -104,7 +104,7 @@ import akka.persistence.typed.PersistenceId val stmt = connection.createStatement(baseUpsertMigrationProgressSql("snapshot_seq_nr")) bindBaseUpsertSql(stmt, persistenceId, seqNr) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def updateDurableStateProgress(persistenceId: String, revision: Long): Future[Done] = { @@ -113,7 +113,7 @@ import akka.persistence.typed.PersistenceId val stmt = connection.createStatement(baseUpsertMigrationProgressSql("state_revision")) bindBaseUpsertSql(stmt, persistenceId, revision) } - .map(_ => Done)(ExecutionContexts.parasitic) + .map(_ => Done)(ExecutionContext.parasitic) } def currentProgress(persistenceId: String): Future[Option[CurrentProgress]] = { diff --git a/native-image-tests/build.sbt b/native-image-tests/build.sbt index 72a6f11d..acfe20c1 100644 --- a/native-image-tests/build.sbt +++ b/native-image-tests/build.sbt @@ -6,7 +6,7 @@ scalaVersion := "2.13.14" resolvers += "Akka library repository".at("https://repo.akka.io/maven") -lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.4") +lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.10.0-M1") lazy val akkaR2dbcVersion = sys.props.getOrElse("akka.r2dbc.version", "1.2.3") fork := true @@ -28,7 +28,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, "com.lightbend.akka" %% "akka-persistence-r2dbc" % akkaR2dbcVersion, - "ch.qos.logback" % "logback-classic" % "1.2.13", + "ch.qos.logback" % "logback-classic" % "1.5.7", // H2 "com.h2database" % "h2" % "2.2.224", "io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE") diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7b3d44c4..6d04dce7 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,9 +9,9 @@ object Dependencies { val Scala3 = "3.3.3" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.4") + val AkkaVersion = System.getProperty("override.akka.version", "2.10.0-M1") val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } - val AkkaPersistenceJdbcVersion = "5.4.0" // only in migration tool tests + val AkkaPersistenceJdbcVersion = "5.5.0-M1" // only in migration tool tests val AkkaProjectionVersionInDocs = "current" val H2Version = "2.2.224" val R2dbcH2Version = "1.0.0.RELEASE" @@ -48,7 +48,7 @@ object Dependencies { val postgresql = "org.postgresql" % "postgresql" % "42.7.3" % Test // BSD-2-Clause - val logback = "ch.qos.logback" % "logback-classic" % "1.2.13" % Test // EPL 1.0 / LGPL 2.1 + val logback = "ch.qos.logback" % "logback-classic" % "1.5.7" % Test // EPL 1.0 / LGPL 2.1 val scalaTest = "org.scalatest" %% "scalatest" % "3.2.12" % Test // ApacheV2 val junit = "junit" % "junit" % "4.12" % Test // Eclipse Public License 1.0 val junitInterface = "com.novocode" % "junit-interface" % "0.11" % Test // "BSD 2-Clause" @@ -75,9 +75,7 @@ object Dependencies { val migrationTests = Seq( - ("com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion % Test) - // Unsupported SLF4J 2 transitively pulled in by Slick 3.5.0 - .exclude("org.slf4j", "slf4j-api"), + "com.lightbend.akka" %% "akka-persistence-jdbc" % AkkaPersistenceJdbcVersion % Test, "com.microsoft.sqlserver" % "mssql-jdbc" % SqlServerJdbcVersion % Test, TestDeps.postgresql, TestDeps.logback,