Skip to content

Commit

Permalink
Producer filtering of events, first stab
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Jun 27, 2023
1 parent e200c1b commit 45e023f
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 110 deletions.
5 changes: 4 additions & 1 deletion akka-projection-grpc-tests/src/it/resources/logback-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
<logger name="akka.actor.typed.pubsub" level="INFO" />
<logger name="akka.http" level="INFO" />
<logger name="akka.cluster.typed.internal.receptionist" level="INFO" />
<logger name="io.grpc.netty.shaded.io.grpc.netty" level="INFO" />
<logger name="io.grpc.netty" level="INFO" />
<logger name="io.netty" level="INFO" />
<logger name="io.r2dbc.postgresql" level="INFO" />
<logger name="reactor.netty.resources" level="INFO" />
<logger name="io.r2dbc.pool" level="INFO" />
<logger name="com.github.dockerjava" level="INFO" />
<logger name="org.testcontainers" level="INFO" />

<root level="TRACE">
<appender-ref ref="CapturingAppender"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.projection.ProjectionBehavior
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
Expand All @@ -22,14 +24,18 @@ import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
import akka.projection.grpc.internal.EventConsumerServiceImpl
import akka.projection.grpc.internal.EventPusher
import akka.projection.grpc.internal.FilteredPayloadMapper
import akka.projection.grpc.internal.proto.EventConsumerServiceClient
import akka.projection.grpc.internal.proto.EventConsumerServiceHandler
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
import akka.projection.r2dbc.R2dbcProjectionSettings
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._

Expand Down Expand Up @@ -78,12 +84,16 @@ object ProducerPushSpec {
# reducing this to have quicker test, triggers backtracking earlier
backtracking.behind-current-time = 3 seconds
}
journal.publish-events-number-of-topics = 2
}

# consumer uses its own h2
test.consumer.r2dbc = $${akka.persistence.r2dbc}
test.consumer.r2dbc.connection-factory = $${akka.persistence.r2dbc.h2}
test.consumer.r2dbc.connection-factory.database = "consumer-db"
test.consumer.projection = $${akka.projection.r2dbc}
test.consumer.projection.use-connection-factory = "test.consumer.r2dbc.connection-factory"
""").withFallback(ConfigFactory.load("persistence.conf")).resolve()
}
class ProducerPushSpec(testContainerConf: TestContainerConf)
Expand All @@ -104,29 +114,36 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
override def typedSystem: ActorSystem[_] = system

val entityType = nextEntityType()
val streamId = "entity_stream_id"
val producerProjectionId = randomProjectionId()
val consumerProjectionId = randomProjectionId()

lazy val consumerProjectionSettings: R2dbcProjectionSettings =
R2dbcProjectionSettings(typedSystem.settings.config.getConfig("test.consumer.projection"))

val grpcPort = 9588

def producerSourceProvider =
EventSourcedProvider.eventsBySlices[String](system, R2dbcReadJournal.Identifier, entityType, 0, 1023)
def producerSourceProvider(eps: EventProducerSource) =
EventSourcedProvider.eventsBySlices[String](system, R2dbcReadJournal.Identifier, eps.entityType, 0, 1023)

val eventConsumerClient = EventConsumerServiceClient(
GrpcClientSettings.connectToServiceAt("127.0.0.1", grpcPort).withTls(false))

// this projection runs in the producer and pushes events over grpc to the consumer
def spawnProducerReplicationProjection(): ActorRef[ProjectionBehavior.Command] =
def spawnProducerReplicationProjection(eps: EventProducerSource): ActorRef[ProjectionBehavior.Command] =
spawn(
ProjectionBehavior(
R2dbcProjection.atLeastOnceAsync(
R2dbcProjection.atLeastOnceAsync[Offset, EventEnvelope[String]](
producerProjectionId,
settings = None,
sourceProvider = producerSourceProvider,
handler = () => new EventPusher[String](eventConsumerClient, Transformation.identity))))
sourceProvider = producerSourceProvider(eps),
handler = () => new EventPusher(eventConsumerClient, eps))))

def counsumerSourceProvider =
EventSourcedProvider.eventsBySlices[String](system, "test.consumer.r2dbc.query", entityType, 0, 1023)
def counsumerSourceProvider = {
// FIXME how do we auto-wrap with this?
new FilteredPayloadMapper(
EventSourcedProvider.eventsBySlices[String](system, "test.consumer.r2dbc.query", entityType, 0, 1023))
}

// this projection runs in the consumer and just consumes the already projected events
def spawnConsumerProjection(probe: ActorRef[EventEnvelope[String]]) =
Expand All @@ -142,6 +159,21 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
Future.successful(Done)
})))

override protected def beforeAll(): Unit = {
super.beforeAll()
lazy val consumerSettings: R2dbcSettings =
R2dbcSettings(typedSystem.settings.config.getConfig("test.consumer.r2dbc"))
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
_.createStatement(s"delete from ${consumerSettings.journalTableWithSchema}")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
_.createStatement(s"delete from ${consumerProjectionSettings.timestampOffsetTableWithSchema}")),
10.seconds)
// FIXME clean up consumer tables!
}

"Producer pushed events" should {

"show up on consumer side" in {
Expand All @@ -162,7 +194,19 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)

// FIXME higher level API for the producer side of this?
// FIXME producer filters
spawnProducerReplicationProjection()
val veggies = Set("cucumber")
// FIXME features provided by EventProducerSource may not be overlapping enough that we need it here
val eps =
EventProducerSource[String](
entityType,
streamId,
Transformation.identity,
EventProducerSettings(system),
producerFilter = envelope =>
// no veggies allowed
!veggies(envelope.event))

spawnProducerReplicationProjection(eps)

// local "regular" projections consume the projected events
val consumerProbe = createTestProbe[EventEnvelope[String]]()
Expand All @@ -174,11 +218,17 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
val pid = nextPid(entityType)
// running this directly, as in producing system (even though they share actor system)
// written in its own db, replicated over grpc to the consumer db.
val entity = spawn(TestEntity(pid))
entity ! TestEntity.Persist("bananas")
val entity1 = spawn(TestEntity(pid))
entity1 ! TestEntity.Persist("bananas")
entity1 ! TestEntity.Persist("cucumber") // producer filter - never seen in consumer
entity1 ! TestEntity.Persist("mangos")

// event projected into consumer journal and shows up in local projection
consumerProbe.receiveMessage(10.seconds).event should be("bananas")

// Note: filtered does not show up in projection at all

consumerProbe.receiveMessage().event should be("mangos")
}
}

Expand Down
1 change: 1 addition & 0 deletions akka-projection-grpc/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ akka {
serialization-bindings {
"akka.projection.grpc.internal.DdataConsumerFilterStore$State" = akka-projection-grpc-consumer
"akka.projection.grpc.internal.DdataConsumerFilterStore$ConsumerFilterKey" = akka-projection-grpc-consumer
"akka.projection.grpc.internal.FilteredPayload$" = akka-projection-grpc-consumer
}
}
}
157 changes: 82 additions & 75 deletions akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
Expand Down Expand Up @@ -35,83 +36,89 @@ private[akka] object EventWriter {

def apply(journalPluginId: String): Behavior[Command] =
Behaviors
.setup[AnyRef] { context =>
val writerUuid = UUID.randomUUID().toString
val journal = Persistence(context.system).journalFor(journalPluginId)
context.log.debug("Event writer for journal [{}] starting up", journalPluginId)

val waitingForResponse = new util.HashMap[(String, Long), ActorRef[StatusReply[Done]]]()

Behaviors.receiveMessage {
case Write(persistenceId, sequenceNumber, event, metadata, replyTo) =>
// FIXME trace or remove
context.log.debug("Writing event persistence id [{}], sequence nr [{}]", persistenceId, sequenceNumber)
val repr = PersistentRepr(
event,
persistenceId = persistenceId,
sequenceNr = sequenceNumber,
manifest = "", // adapters would be on the producing side, already applied
writerUuid = writerUuid,
sender = akka.actor.ActorRef.noSender)

val write = AtomicWrite(metadata match {
case Some(meta) => repr.withMetadata(meta)
case _ => repr
}) :: Nil

waitingForResponse.put((persistenceId, sequenceNumber), replyTo)

journal ! JournalProtocol.WriteMessages(write, context.self.toClassic, context.self.path.uid)
Behaviors.same

case JournalProtocol.WriteMessageSuccess(message, _) =>
val pidSeqnr = (message.persistenceId, message.sequenceNr)
waitingForResponse.get(pidSeqnr) match {
case null =>
context.log.warn2(
"Got write success reply for event with no waiting request, probably a bug (pid {}, seq nr {})",
message.persistenceId,
message.sequenceNr)
Behaviors.same
case replyTo =>
if (context.log.isTraceEnabled)
context.log.trace2(
"Successfully wrote event persistence id [{}], sequence nr [{}]",
.supervise(Behaviors
.setup[AnyRef] { context =>
val writerUuid = UUID.randomUUID().toString
val journal = Persistence(context.system).journalFor(journalPluginId)
context.log.debug("Event writer for journal [{}] starting up", journalPluginId)

val waitingForResponse = new util.HashMap[(String, Long), ActorRef[StatusReply[Done]]]()

Behaviors.receiveMessage {
case Write(persistenceId, sequenceNumber, event, metadata, replyTo) =>
if (context.log.isTraceEnabled)
context.log.traceN(
"Writing event persistence id [{}], sequence nr [{}], payload {}",
persistenceId,
sequenceNumber,
event)
val repr = PersistentRepr(
event,
persistenceId = persistenceId,
sequenceNr = sequenceNumber,
manifest = "", // adapters would be on the producing side, already applied
writerUuid = writerUuid,
sender = akka.actor.ActorRef.noSender)

val write = AtomicWrite(metadata match {
case Some(meta) => repr.withMetadata(meta)
case _ => repr
}) :: Nil

waitingForResponse.put((persistenceId, sequenceNumber), replyTo)

journal ! JournalProtocol.WriteMessages(write, context.self.toClassic, context.self.path.uid)
Behaviors.same

case JournalProtocol.WriteMessageSuccess(message, _) =>
val pidSeqnr = (message.persistenceId, message.sequenceNr)
waitingForResponse.get(pidSeqnr) match {
case null =>
context.log.warn2(
"Got write success reply for event with no waiting request, probably a bug (pid {}, seq nr {})",
message.persistenceId,
message.sequenceNr)
replyTo ! StatusReply.success(Done)
waitingForResponse.remove(pidSeqnr)
Behaviors.same
}
Behaviors.same

case JournalProtocol.WriteMessageFailure(message, error, _) =>
val pidSeqnr = (message.persistenceId, message.sequenceNr)
waitingForResponse.get(pidSeqnr) match {
case null =>
context.log.warnN(
s"Got error reply for event with no waiting request, probably a bug (pid ${message.persistenceId}, seq nr ${message.sequenceNr})",
error)
Behaviors.same
case replyTo =>
context.log.warnN(
"Failed writing event persistence id [{}], sequence nr [{}]: {}",
message.persistenceId,
message.sequenceNr,
error.getMessage)

replyTo ! StatusReply.error(error.getMessage)

waitingForResponse.remove(pidSeqnr)
Behaviors.same
}

case _ =>
// ignore all other journal protocol messages
Behaviors.same

}
}
Behaviors.same
case replyTo =>
if (context.log.isTraceEnabled)
context.log.trace2(
"Successfully wrote event persistence id [{}], sequence nr [{}]",
message.persistenceId,
message.sequenceNr)
replyTo ! StatusReply.success(Done)
waitingForResponse.remove(pidSeqnr)
Behaviors.same
}
Behaviors.same

case JournalProtocol.WriteMessageFailure(message, error, _) =>
val pidSeqnr = (message.persistenceId, message.sequenceNr)
waitingForResponse.get(pidSeqnr) match {
case null =>
context.log.warnN(
s"Got error reply for event with no waiting request, probably a bug (pid ${message.persistenceId}, seq nr ${message.sequenceNr})",
error)
Behaviors.same
case replyTo =>
context.log.warnN(
"Failed writing event persistence id [{}], sequence nr [{}]: {}",
message.persistenceId,
message.sequenceNr,
error.getMessage)

replyTo ! StatusReply.error(error.getMessage)

waitingForResponse.remove(pidSeqnr)
Behaviors.same
}

case _ =>
// ignore all other journal protocol messages
Behaviors.same

}
})
.onFailure[Exception](SupervisorStrategy.restart)
.narrow[Command]

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import scalapb.GeneratedMessage
with BaseSerializer {
private val ConsumerFilterStoreStateManifest = "A"
private val ConsumerFilterKeyManifest = "B"
private val FilteredPayloadManifest = "C"

private final val CompressionBufferSize = 1024 * 4

Expand All @@ -40,13 +41,15 @@ import scalapb.GeneratedMessage
override def manifest(obj: AnyRef): String = obj match {
case _: DdataConsumerFilterStore.State => ConsumerFilterStoreStateManifest
case _: DdataConsumerFilterStore.ConsumerFilterKey => ConsumerFilterKeyManifest
case FilteredPayload => FilteredPayloadManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}

override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case state: DdataConsumerFilterStore.State => compress(stateToProto(state))
case key: DdataConsumerFilterStore.ConsumerFilterKey => replicatedDataSerializer.keyIdToBinary(key.id)
case FilteredPayload => Array.empty[Byte]
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
Expand All @@ -55,6 +58,7 @@ import scalapb.GeneratedMessage
case ConsumerFilterStoreStateManifest => stateFromBinary(decompress(bytes))
case ConsumerFilterKeyManifest =>
DdataConsumerFilterStore.ConsumerFilterKey(replicatedDataSerializer.keyIdFromBinary(bytes))
case FilteredPayloadManifest => FilteredPayload
case _ =>
throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
Expand Down
Loading

0 comments on commit 45e023f

Please sign in to comment.