Skip to content

Commit

Permalink
feat: ChangeEvent from Durable State (akka#32254)
Browse files Browse the repository at this point in the history
* by storing additional change events when Durable State is updated and
  deleted we make it possible to use all nice event sourced Projection
  capabilities with Durable State, including Projections over gRPC
* otherwise we would have to duplicate many many things to use the
  existing DurableStateChange and the queries based on that
* we might even deprecate the existing DurableStateChange if this
  works out, but that can be a later decision
* ApiMayChange
* reference docs for plugin api
* note about mutable state
  • Loading branch information
patriknw authored and He-Pin committed Jan 7, 2024
1 parent b7741c8 commit 37a5b46
Show file tree
Hide file tree
Showing 22 changed files with 827 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package docs.persistence.state;

// #plugin-imports

import akka.Done;
import akka.actor.ExtendedActorSystem;
import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore;
import akka.persistence.state.javadsl.GetObjectResult;
import com.typesafe.config.Config;
import java.util.concurrent.CompletionStage;

// #plugin-imports

// #state-store-plugin-api
class MyChangeEventJavaStateStore<A> implements DurableStateUpdateWithChangeEventStore<A> {

private ExtendedActorSystem system;
private Config config;
private String cfgPath;

public MyChangeEventJavaStateStore(ExtendedActorSystem system, Config config, String cfgPath) {
this.system = system;
this.config = config;
this.cfgPath = cfgPath;
}

/**
* Will delete the state by setting it to the empty state and the revision number will be
* incremented by 1.
*/
@Override
public CompletionStage<Done> deleteObject(String persistenceId, long revision) {
// implement deleteObject here
return null;
}

@Override
public CompletionStage<Done> deleteObject(
String persistenceId, long revision, Object changeEvent) {
// implement deleteObject here
return null;
}

/** Returns the current state for the given persistence id. */
@Override
public CompletionStage<GetObjectResult<A>> getObject(String persistenceId) {
// implement getObject here
return null;
}

/**
* Will persist the latest state. If it’s a new persistence id, the record will be inserted.
*
* <p>In case of an existing persistence id, the record will be updated only if the revision
* number of the incoming record is 1 more than the already existing record. Otherwise persist
* will fail.
*/
@Override
public CompletionStage<Done> upsertObject(
String persistenceId, long revision, Object value, String tag) {
// implement upsertObject here
return null;
}

/** Deprecated. Use the deleteObject overload with revision instead. */
@Override
public CompletionStage<Done> deleteObject(String persistenceId) {
return deleteObject(persistenceId, 0);
}

@Override
public CompletionStage<Done> upsertObject(
String persistenceId, long revision, A value, String tag, Object changeEvent) {
// implement deleteObject here
return null;
}
}
// #state-store-plugin-api
10 changes: 10 additions & 0 deletions akka-docs/src/main/paradox/durable-state/state-store-plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ Scala
Java
: @@snip [MyJavaStateStore.java](/akka-docs/src/main/java/docs/persistence/state/MyJavaStateStore.java) { #state-store-plugin-api }

A durable state store plugin may also extend `DurableStateUpdateWithChangeEventStore` to store additional change event.

`DurableStateUpdateWithChangeEventStore` is an interface and the methods to be implemented are:

Scala
: @@snip [MyStateStore.scala](/akka-docs/src/main/scala/docs/persistence/state/MyStateStore.scala) { //#plugin-api-change-event }

Java
: @@snip [MyChangeEventJavaStateStore.java](/akka-docs/src/main/java/docs/persistence/state/MyChangeEventJavaStateStore.java) { #state-store-plugin-api }

## State Store provider

A `DurableStateStoreProvider` needs to be implemented to be able to create the plugin itself:
Expand Down
48 changes: 48 additions & 0 deletions akka-docs/src/main/scala/docs/persistence/state/MyStateStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import akka.actor.ExtendedActorSystem
import akka.persistence.state.{ DurableStateStoreProvider, DurableStateStoreRegistry }
import akka.persistence.state.scaladsl.{ DurableStateStore, DurableStateUpdateStore, GetObjectResult }
import com.typesafe.config.Config

import akka.persistence.state.javadsl.{ DurableStateStore => JDurableStateStore }
import scala.concurrent.Future

import akka.persistence.state.scaladsl.DurableStateUpdateWithChangeEventStore

//#plugin-provider
class MyStateStoreProvider(system: ExtendedActorSystem, config: Config, cfgPath: String)
extends DurableStateStoreProvider {
Expand Down Expand Up @@ -57,3 +60,48 @@ class MyStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: Stri
override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ???
}
//#plugin-api

//#plugin-api-change-event
class MyChangeEventStateStore[A](system: ExtendedActorSystem, config: Config, cfgPath: String)
extends DurableStateUpdateWithChangeEventStore[A] {

/**
* The `changeEvent` is written to the event journal.
* Same `persistenceId` is used in the journal and the `revision` is used as `sequenceNr`.
*
* @param revision sequence number for optimistic locking. starts at 1.
*/
override def upsertObject(
persistenceId: String,
revision: Long,
value: A,
tag: String,
changeEvent: Any): Future[Done] = ???

override def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): Future[Done] = ???

/**
* Will persist the latest state. If it’s a new persistence id, the record will be inserted.
*
* In case of an existing persistence id, the record will be updated only if the revision
* number of the incoming record is 1 more than the already existing record. Otherwise persist will fail.
*/
override def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): Future[Done] = ???

/**
* Deprecated. Use the deleteObject overload with revision instead.
*/
override def deleteObject(persistenceId: String): Future[Done] = deleteObject(persistenceId, 0)

/**
* Will delete the state by setting it to the empty state and the revision number will be incremented by 1.
*/
override def deleteObject(persistenceId: String, revision: Long): Future[Done] = ???

/**
* Returns the current state for the given persistence id.
*/
override def getObject(persistenceId: String): Future[GetObjectResult[A]] = ???

}
//#plugin-api-change-event
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ import akka.japi.Pair
import akka.persistence.query.DurableStateChange
import akka.persistence.query.Offset
import akka.persistence.query.javadsl.{ DurableStateStorePagedPersistenceIdsQuery, DurableStateStoreQuery }
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.javadsl.CurrentEventsBySliceQuery
import akka.persistence.query.typed.javadsl.DurableStateStoreBySliceQuery
import akka.persistence.state.javadsl.DurableStateUpdateStore
import akka.persistence.query.typed.javadsl.EventsBySliceQuery
import akka.persistence.state.javadsl.DurableStateUpdateWithChangeEventStore
import akka.persistence.state.javadsl.GetObjectResult
import akka.persistence.testkit.state.scaladsl.{ PersistenceTestKitDurableStateStore => SStore }
import akka.stream.javadsl.Source
Expand All @@ -26,22 +29,30 @@ object PersistenceTestKitDurableStateStore {
}

class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A])
extends DurableStateUpdateStore[A]
extends DurableStateUpdateWithChangeEventStore[A]
with DurableStateStoreQuery[A]
with DurableStateStoreBySliceQuery[A]
with DurableStateStorePagedPersistenceIdsQuery[A] {
with DurableStateStorePagedPersistenceIdsQuery[A]
with CurrentEventsBySliceQuery
with EventsBySliceQuery {

def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] =
stateStore.getObject(persistenceId).map(_.toJava)(stateStore.system.dispatcher).toJava

def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] =
stateStore.upsertObject(persistenceId, seqNr, value, tag).toJava

def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String, changeEvent: Any): CompletionStage[Done] =
stateStore.upsertObject(persistenceId, seqNr, value, tag, changeEvent).toJava

def deleteObject(persistenceId: String): CompletionStage[Done] = CompletableFuture.completedFuture(Done)

def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
stateStore.deleteObject(persistenceId, revision).toJava

def deleteObject(persistenceId: String, revision: Long, changeEvent: Any): CompletionStage[Done] =
stateStore.deleteObject(persistenceId, revision, changeEvent).toJava

def changes(tag: String, offset: Offset): Source[DurableStateChange[A], akka.NotUsed] = {
stateStore.changes(tag, offset).asJava
}
Expand Down Expand Up @@ -77,4 +88,23 @@ class PersistenceTestKitDurableStateStore[A](stateStore: SStore[A])
override def currentPersistenceIds(afterId: Optional[String], limit: Long): Source[String, NotUsed] =
stateStore.currentPersistenceIds(afterId.asScala, limit).asJava

/**
* For change events.
*/
override def currentEventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed] =
stateStore.currentEventsBySlices(entityType, minSlice, maxSlice, offset).asJava

/**
* For change events.
*/
override def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[EventEnvelope[Event], NotUsed] =
stateStore.eventsBySlices(entityType, minSlice, maxSlice, offset).asJava
}
Loading

0 comments on commit 37a5b46

Please sign in to comment.