diff --git a/.github/workflows/native-image-tests.yml b/.github/workflows/native-image-tests.yml index a9a3fc37..c9f22b75 100644 --- a/.github/workflows/native-image-tests.yml +++ b/.github/workflows/native-image-tests.yml @@ -47,12 +47,30 @@ jobs: run: |- sbt "publishLocal" - #- name: Akka Persistence R2DBC native image test app build - # run: |- - # cd native-image-tests/ - # sbt nativeImage -Dakka.http.version=`cat ~/.version` - # run the binary - # target/native-image/native-image-tests + - name: Akka Persistence R2DBC native image test app build + run: |- + cd native-image-tests/ + sbt nativeImage -Dakka.r2dbc.version=`cat ~/.version` + + - name: Akka Persistence native image H2 inmem + run: |- + cd native-image-tests/ + target/native-image/native-image-tests + + - name: Akka Persistence native image H2 file + run: |- + cd native-image-tests/ + target/native-image/native-image-tests -Dconfig.resource=application-h2-file.conf + + - name: Start Postgres DB + run: |- + docker compose -f docker/docker-compose-postgres.yml up --wait + docker exec -i postgres-db psql -U postgres -t < ddl-scripts/create_tables_postgres.sql + + - name: Akka Persistence native image H2 + run: |- + cd native-image-tests/ + target/native-image/native-image-tests -Dconfig.resource=application-postgres.conf - name: Email on failure if: ${{ failure() }} diff --git a/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json b/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json new file mode 100644 index 00000000..6e247736 --- /dev/null +++ b/core/src/main/resources/META-INF/native-image/com.lightbend.akka/akka-persistence-r2dbc/reflect-config.json @@ -0,0 +1,53 @@ +[ + { + "name": "akka.persistence.r2dbc.journal.R2dbcJournal", + "methods": [ + { + "name": "", + "parameterTypes": [ + "com.typesafe.config.Config", + "java.lang.String" + ] + } + ] + }, + { + "name": "akka.persistence.r2dbc.query.R2dbcReadJournalProvider", + "methods": [ + { + "name": "", + "parameterTypes": [ + "akka.actor.ExtendedActorSystem", + "com.typesafe.config.Config", + "java.lang.String" + ] + } + ] + }, + { + "name": "akka.persistence.r2dbc.snapshot.R2dbcSnapshotStore", + "methods": [ + { + "name": "", + "parameterTypes": [ + "com.typesafe.config.Config", + "java.lang.String" + ] + } + ] + }, + { + "name": "akka.persistence.r2dbc.state.R2dbcDurableStateStoreProvider", + "methods": [ + { + "name": "", + "parameterTypes": [ + "akka.actor.ExtendedActorSystem", + "com.typesafe.config.Config", + "java.lang.String" + ] + } + ] + } + +] \ No newline at end of file diff --git a/core/src/main/resources/META-INF/native-image/io.netty/netty-handler/native-image.properties b/core/src/main/resources/META-INF/native-image/io.netty/netty-handler/native-image.properties new file mode 100644 index 00000000..755a25a1 --- /dev/null +++ b/core/src/main/resources/META-INF/native-image/io.netty/netty-handler/native-image.properties @@ -0,0 +1 @@ +Args = --initialize-at-run-time=io.netty.handler.ssl.BouncyCastleAlpnSslUtils \ No newline at end of file diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md index 76aa2128..a5f52b1c 100644 --- a/docs/src/main/paradox/index.md +++ b/docs/src/main/paradox/index.md @@ -19,6 +19,7 @@ The Akka Persistence R2DBC plugin allows for using SQL database with R2DBC as a * [Cleanup tool](cleanup.md) * [Migration tool](migration.md) * [Migration Guide](migration-guide.md) +* [Native Image](native-image.md) * [Contributing](contributing.md) @@@ diff --git a/docs/src/main/paradox/native-image.md b/docs/src/main/paradox/native-image.md new file mode 100644 index 00000000..d5dce223 --- /dev/null +++ b/docs/src/main/paradox/native-image.md @@ -0,0 +1,11 @@ +# Building Native Images + +Building native images with Akka Persistence R2DBC is supported out of the box for the event sourced journal, snapshot store and +durable state store with the following databases: + +* H2 (inmem and file) +* Postgres + +Other databases can likely be used but will require figuring out and adding additional native-image metadata. + +For details about building native images with Akka in general, see the @extref[Akka Documentation](akka:additional/native-image.html). \ No newline at end of file diff --git a/native-image-tests/.gitignore b/native-image-tests/.gitignore new file mode 100644 index 00000000..c4ddf444 --- /dev/null +++ b/native-image-tests/.gitignore @@ -0,0 +1,12 @@ +target/ + +.settings +.project +.classpath + +.idea +*.iml + +.metals +.bloop +.bsp diff --git a/native-image-tests/build.sbt b/native-image-tests/build.sbt new file mode 100644 index 00000000..944a5df3 --- /dev/null +++ b/native-image-tests/build.sbt @@ -0,0 +1,34 @@ +name := "native-image-tests" + +version := "1.0" + +scalaVersion := "2.13.12" + +resolvers += "Akka library repository".at("https://repo.akka.io/maven") + +lazy val akkaVersion = sys.props.getOrElse("akka.version", "2.9.2") +lazy val akkaR2dbcVersion = sys.props.getOrElse("akka.r2dbc.version", "1.2.3") + +fork := true + +// GraalVM native image build +enablePlugins(NativeImagePlugin) +nativeImageJvm := "graalvm-community" +nativeImageVersion := "21.0.2" +nativeImageOptions := Seq( + "--no-fallback", + "--verbose", + "--initialize-at-build-time=ch.qos.logback", + "-Dakka.native-image.debug=true") + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-persistence-typed" % akkaVersion, + "com.typesafe.akka" %% "akka-persistence-query" % akkaVersion, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, + "com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion, + "com.lightbend.akka" %% "akka-persistence-r2dbc" % akkaR2dbcVersion, + "ch.qos.logback" % "logback-classic" % "1.2.13", + // H2 + "com.h2database" % "h2" % "2.2.224", + "io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE") diff --git a/native-image-tests/project/build.properties b/native-image-tests/project/build.properties new file mode 100644 index 00000000..abbbce5d --- /dev/null +++ b/native-image-tests/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.8 diff --git a/native-image-tests/project/plugins.sbt b/native-image-tests/project/plugins.sbt new file mode 100644 index 00000000..f28fde7a --- /dev/null +++ b/native-image-tests/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("org.scalameta" % "sbt-native-image" % "0.3.4") diff --git a/native-image-tests/src/main/resources/application-h2-file.conf b/native-image-tests/src/main/resources/application-h2-file.conf new file mode 100644 index 00000000..0c68654a --- /dev/null +++ b/native-image-tests/src/main/resources/application-h2-file.conf @@ -0,0 +1,9 @@ +akka.persistence.journal.plugin = "akka.persistence.r2dbc.journal" +akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" +akka.persistence.state.plugin = "akka.persistence.r2dbc.state" + +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.h2} +akka.persistence.r2dbc.connection-factory = { + protocol = "file" + database = "/tmp/test-h2-database" +} diff --git a/native-image-tests/src/main/resources/application-postgres.conf b/native-image-tests/src/main/resources/application-postgres.conf new file mode 100644 index 00000000..6ae182fe --- /dev/null +++ b/native-image-tests/src/main/resources/application-postgres.conf @@ -0,0 +1,5 @@ +akka.persistence.journal.plugin = "akka.persistence.r2dbc.journal" +akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" +akka.persistence.state.plugin = "akka.persistence.r2dbc.state" + +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.postgres} diff --git a/native-image-tests/src/main/resources/application.conf b/native-image-tests/src/main/resources/application.conf new file mode 100644 index 00000000..8e21e2ad --- /dev/null +++ b/native-image-tests/src/main/resources/application.conf @@ -0,0 +1,10 @@ +akka.persistence.journal.plugin = "akka.persistence.r2dbc.journal" +akka.persistence.snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" +akka.persistence.state.plugin = "akka.persistence.r2dbc.state" + +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.h2} +akka.persistence.r2dbc.connection-factory = { + # overrides for default values from the 'akka.persistence.r2dbc.h2' config block + protocol = "mem" + database = "mydb" +} diff --git a/native-image-tests/src/main/resources/logback.xml b/native-image-tests/src/main/resources/logback.xml new file mode 100644 index 00000000..4b6e3608 --- /dev/null +++ b/native-image-tests/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n + + + + + + + + diff --git a/native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala b/native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala new file mode 100644 index 00000000..b295d376 --- /dev/null +++ b/native-image-tests/src/main/scala/com/lightbend/DurableStateTester.scala @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2009-2024 Lightbend Inc. + */ +package com.lightbend + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.state.scaladsl.DurableStateBehavior +import akka.persistence.typed.state.scaladsl.Effect +import akka.serialization.jackson.JsonSerializable + +import scala.concurrent.duration.DurationInt + +object DurableStateCounter { + sealed trait Command extends JsonSerializable + final case class Increase(amount: Int, replyTo: ActorRef[Increased]) extends Command + + final case class GetState(replyTo: ActorRef[State]) extends Command + + final case class Increased(newValue: Int) extends JsonSerializable + + final case class State(value: Int) extends JsonSerializable + def apply(id: String): Behavior[Command] = + DurableStateBehavior[Command, State]( + PersistenceId("DSCounter", id), + State(0), + { + case (state, Increase(amount, replyTo)) => + Effect.persist(State(state.value + amount)).thenReply(replyTo)(newState => Increased(newState.value)) + case (state, GetState(replyTo)) => + Effect.reply(replyTo)(state) + }) +} + +object DurableStateTester { + + def apply(whenDone: ActorRef[String]): Behavior[AnyRef] = Behaviors.setup { context => + Behaviors.withTimers { timers => + timers.startSingleTimer("Timeout", 10.seconds) + + var durableActor = context.spawn(DurableStateCounter("one"), "DurableOne") + context.watchWith(durableActor, "DurableOneStopped") + + def messageOrTimeout(step: String)(partial: PartialFunction[AnyRef, Behavior[AnyRef]]): Behavior[AnyRef] = { + context.log.info("On {}", step) + Behaviors.receiveMessage(message => + partial.orElse[AnyRef, Behavior[AnyRef]] { + case "Timeout" => + context.log.error(s"Durable state checks timed out in {}", step) + System.exit(1) + Behaviors.same + + case other => + context.log.warn("Unexpected message in {}: {}", step, other) + Behaviors.same + }(message)) + } + + durableActor ! DurableStateCounter.Increase(1, context.self) + + def step1() = messageOrTimeout("step1") { case DurableStateCounter.Increased(1) => + // write works + context.stop(durableActor) + step2() + } + + def step2() = messageOrTimeout("step2") { case "DurableOneStopped" => + durableActor = context.spawn(DurableStateCounter("one"), "DurableOneIncarnation2") + durableActor ! DurableStateCounter.GetState(context.self) + step3() + } + + def step3() = messageOrTimeout("step3") { case DurableStateCounter.State(1) => + whenDone ! "Durable State works" + Behaviors.stopped + } + + step1() + } + } + +} diff --git a/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala b/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala new file mode 100644 index 00000000..37d3acfc --- /dev/null +++ b/native-image-tests/src/main/scala/com/lightbend/EsbTester.scala @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2009-2024 Lightbend Inc. + */ +package com.lightbend + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.pattern.StatusReply +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.serialization.jackson.JsonSerializable + +import scala.concurrent.duration.DurationInt + +object EventSourcedCounter { + sealed trait Command extends JsonSerializable + + final case class Increase(amount: Int, replyTo: ActorRef[StatusReply[Increased]]) extends Command + final case class GetValue(replyTo: ActorRef[StatusReply[GetValueResponse]]) extends Command + final case class GetValueResponse(value: Int) + + sealed trait Event extends JsonSerializable + + final case class Increased(amount: Int) extends Event + + final case class State(value: Int) extends JsonSerializable + + def apply(id: String): Behavior[Command] = EventSourcedBehavior[Command, Event, State]( + PersistenceId("EventSourcedHelloWorld", id), + State(0), + { (state, command) => + command match { + case Increase(increment, replyTo) => + val increased = Increased(increment) + Effect.persist(increased).thenReply(replyTo)(_ => StatusReply.success(increased)) + case GetValue(replyTo) => + Effect.reply(replyTo)(StatusReply.success(GetValueResponse(state.value))) + } + }, + { (_, event) => + event match { + case Increased(newGreeting) => State(newGreeting) + } + }).snapshotWhen((_, _, seqNr) => seqNr % 2 == 0) +} + +object EsbTester { + + object EsbStopped + + def apply(whenDone: ActorRef[String]): Behavior[AnyRef] = Behaviors.setup { context => + Behaviors.withTimers { timers => + + timers.startSingleTimer("Timeout", 10.seconds) + + var eventSourcedHelloWorld = context.spawn(EventSourcedCounter("one"), "EsbOne") + context.watchWith(eventSourcedHelloWorld, EsbStopped) + eventSourcedHelloWorld ! EventSourcedCounter.Increase(1, context.self) + + def messageOrTimeout(step: String)(partial: PartialFunction[AnyRef, Behavior[AnyRef]]): Behavior[AnyRef] = { + context.log.info("On {}", step) + Behaviors.receiveMessage(message => + partial.orElse[AnyRef, Behavior[AnyRef]] { + case "Timeout" => + context.log.error(s"ESB checks timed out in {}", step) + System.exit(1) + Behaviors.same + + case other => + context.log.warn("Unexpected message in {}: {}", step, other) + Behaviors.same + }(message)) + } + + def step1() = messageOrTimeout("step1") { case StatusReply.Success(EventSourcedCounter.Increased(1)) => + eventSourcedHelloWorld ! EventSourcedCounter.Increase(2, context.self) + step2() + } + + def step2() = + messageOrTimeout("step2") { case StatusReply.Success(EventSourcedCounter.Increased(2)) => + // triggers snapshot + eventSourcedHelloWorld ! EventSourcedCounter.Increase(2, context.self) + step3() + } + + def step3() = + messageOrTimeout("step3") { case StatusReply.Success(EventSourcedCounter.Increased(2)) => + eventSourcedHelloWorld ! EventSourcedCounter.GetValue(context.self) + step4() + } + + def step4() = messageOrTimeout("step4") { case StatusReply.Success(EventSourcedCounter.GetValueResponse(2)) => + context.stop(eventSourcedHelloWorld) + step5() + } + + def step5() = messageOrTimeout("step5") { case EsbStopped => + // start anew to trigger replay + eventSourcedHelloWorld = context.spawn(EventSourcedCounter("one"), "EsbOneIncarnation2") + eventSourcedHelloWorld ! EventSourcedCounter.GetValue(context.self) + step6() + + } + + def step6() = messageOrTimeout("step6") { case StatusReply.Success(EventSourcedCounter.GetValueResponse(2)) => + // replay was fine + whenDone ! "ESB works" + Behaviors.stopped + } + + step1() + } + } +} diff --git a/native-image-tests/src/main/scala/com/lightbend/Main.scala b/native-image-tests/src/main/scala/com/lightbend/Main.scala new file mode 100644 index 00000000..6ceeeaeb --- /dev/null +++ b/native-image-tests/src/main/scala/com/lightbend/Main.scala @@ -0,0 +1,45 @@ +package com.lightbend + +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors + +import scala.concurrent.duration.DurationInt + +object RootBehavior { + def apply(): Behavior[AnyRef] = Behaviors.setup { context => + Behaviors.withTimers { timers => + timers.startSingleTimer("Timeout", 30.seconds) + context.spawn(EsbTester(context.self), "ESBTester") + context.spawn(DurableStateTester(context.self), "DurableStateTester") + + var awaitedOks = Set("ESB works", "Durable State works") + + Behaviors.receiveMessage { + case "Timeout" => + context.log.error("Suite of checks timed out, missing awaitedOks: {}", awaitedOks) + System.exit(1) + Behaviors.same + + case string: String => + awaitedOks -= string + if (awaitedOks.isEmpty) { + context.log.info("All checks ok, shutting down") + Behaviors.stopped + } else { + context.log.info("Continuing, awaitedOks not empty: {}", awaitedOks) + Behaviors.same + } + case other => + context.log.warn("Unexpected message: {}", other) + Behaviors.same + } + } + } +} + +object Main extends App { + + ActorSystem(RootBehavior(), "R2dbcTester") + +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7fcdc2ab..c8ba0ca1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val Scala3 = "3.3.1" val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.1") + val AkkaVersion = System.getProperty("override.akka.version", "2.9.2") val AkkaVersionInDocs = AkkaVersion.take(3) val AkkaPersistenceJdbcVersion = "5.2.0" // only in migration tool tests val AkkaProjectionVersionInDocs = "current"