Skip to content

Commit

Permalink
Working for H2 and event source, snapshot, durable state
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Feb 23, 2024
1 parent 12456d6 commit c003f78
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@
]
}
]
},
{
"name": "akka.persistence.r2dbc.state.R2dbcDurableStateStoreProvider",
"methods": [
{
"name": "<init>",
"parameterTypes": [
"akka.actor.ExtendedActorSystem",
"com.typesafe.config.Config",
"java.lang.String"
]
}
]
}

]
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (C) 2009-2024 Lightbend Inc. <https://www.lightbend.com>
*/
package com.lightbend

import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.state.scaladsl.DurableStateBehavior
import akka.persistence.typed.state.scaladsl.Effect
import akka.serialization.jackson.JsonSerializable
import com.fasterxml.jackson.annotation.JsonCreator

import scala.concurrent.duration.DurationInt

object DurableStateCounter {
sealed trait Command extends JsonSerializable
final case class Increase(amount: Int, replyTo: ActorRef[Increased]) extends Command

// FIXME why doesn't @JsonCreator work as usual? is it something missing from the jackson feature?
final case class GetState @JsonCreator() (replyTo: ActorRef[State]) extends Command

final case class Increased @JsonCreator() (newValue: Int) extends JsonSerializable

final case class State @JsonCreator() (value: Int) extends JsonSerializable
def apply(id: String): Behavior[Command] =
DurableStateBehavior[Command, State](
PersistenceId("DSCounter", id),
State(0),
{
case (state, Increase(amount, replyTo)) =>
Effect.persist(State(state.value + amount)).thenReply(replyTo)(newState => Increased(newState.value))
case (state, GetState(replyTo)) =>
Effect.reply(replyTo)(state)
})
}

object DurableStateTester {

def apply(whenDone: ActorRef[String]): Behavior[AnyRef] = Behaviors.setup { context =>
Behaviors.withTimers { timers =>
timers.startSingleTimer("Timeout", 10.seconds)

var durableActor = context.spawn(DurableStateCounter("one"), "DurableOne")
context.watchWith(durableActor, "DurableOneStopped")

def messageOrTimeout(step: String)(partial: PartialFunction[AnyRef, Behavior[AnyRef]]): Behavior[AnyRef] = {
context.log.info("On {}", step)
Behaviors.receiveMessage(message =>
partial.orElse[AnyRef, Behavior[AnyRef]] {
case "Timeout" =>
context.log.error(s"Durable state checks timed out in {}", step)
System.exit(1)
Behaviors.same

case other =>
context.log.warn("Unexpected message in {}: {}", step, other)
Behaviors.same
}(message))
}

durableActor ! DurableStateCounter.Increase(1, context.self)

def step1() = messageOrTimeout("step1") { case DurableStateCounter.Increased(1) =>
// write works
context.stop(durableActor)
step2()
}

def step2() = messageOrTimeout("step2") { case "DurableOneStopped" =>
durableActor = context.spawn(DurableStateCounter("one"), "DurableOneIncarnation2")
durableActor ! DurableStateCounter.GetState(context.self)
step3()
}

def step3() = messageOrTimeout("step3") { case DurableStateCounter.State(1) =>
whenDone ! "Durable State works"
Behaviors.stopped
}

step1()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,21 @@ import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.serialization.jackson.JsonSerializable
import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

import scala.concurrent.duration.DurationInt

object EventSourcedCounter {
sealed trait Command extends JsonSerializable

final case class Increase(amount: Int, replyTo: ActorRef[StatusReply[Increased]]) extends Command
@JsonCreator
final case class GetValue(replyTo: ActorRef[StatusReply[GetValueResponse]]) extends Command
final case class GetValue @JsonCreator() (replyTo: ActorRef[StatusReply[GetValueResponse]]) extends Command
final case class GetValueResponse(value: Int)

sealed trait Event extends JsonSerializable

// FIXME why doesn't @JsonCreator work as usual? is it something missing from the akka jackson feature?
final case class Increased(@JsonProperty("amount") amount: Int) extends Event
final case class Increased @JsonCreator() (amount: Int) extends Event

final case class State(@JsonProperty("value") value: Int) extends JsonSerializable
final case class State @JsonCreator() (value: Int) extends JsonSerializable

def apply(id: String): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
PersistenceId("EventSourcedHelloWorld", id),
Expand Down
38 changes: 25 additions & 13 deletions native-image-tests/src/main/scala/com/lightbend/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,36 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

import scala.concurrent.duration.DurationInt

object RootBehavior {
def apply(): Behavior[AnyRef] = Behaviors.setup { context =>
context.spawn(EsbTester(context.self), "ESBTester")
Behaviors.withTimers { timers =>
timers.startSingleTimer("Timeout", 30.seconds)
context.spawn(EsbTester(context.self), "ESBTester")
context.spawn(DurableStateTester(context.self), "DurableStateTester")

var awaitedOks = Set("ESB works", "Durable State works")

var awaitedOks = Set("ESB works")
Behaviors.receiveMessage {
case "Timeout" =>
context.log.error("Suite of checks timed out, missing awaitedOks: {}", awaitedOks)
System.exit(1)
Behaviors.same

Behaviors.receiveMessage {
case string: String =>
awaitedOks -= string
if (awaitedOks.isEmpty) {
context.log.info("All checks ok, shutting down")
Behaviors.stopped
} else {
case string: String =>
awaitedOks -= string
if (awaitedOks.isEmpty) {
context.log.info("All checks ok, shutting down")
Behaviors.stopped
} else {
context.log.info("Continuing, awaitedOks not empty: {}", awaitedOks)
Behaviors.same
}
case other =>
context.log.warn("Unexpected message: {}", other)
Behaviors.same
}
case other =>
context.log.warn("Unexpected message: {}", other)
Behaviors.same
}
}
}
}
Expand Down

0 comments on commit c003f78

Please sign in to comment.