diff --git a/akka-projection-grpc-tests/src/it/resources/logback-test.xml b/akka-projection-grpc-tests/src/it/resources/logback-test.xml
index e4ef24824..4e4e77a79 100644
--- a/akka-projection-grpc-tests/src/it/resources/logback-test.xml
+++ b/akka-projection-grpc-tests/src/it/resources/logback-test.xml
@@ -22,10 +22,13 @@
-
+
+
+
+
diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/producer/ProducerPushSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/producer/ProducerPushSpec.scala
index 52a1e7944..06b740699 100644
--- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/producer/ProducerPushSpec.scala
+++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/producer/ProducerPushSpec.scala
@@ -12,7 +12,9 @@ import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
+import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
+import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.projection.ProjectionBehavior
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
@@ -22,14 +24,18 @@ import akka.projection.grpc.TestDbLifecycle
import akka.projection.grpc.TestEntity
import akka.projection.grpc.internal.EventConsumerServiceImpl
import akka.projection.grpc.internal.EventPusher
+import akka.projection.grpc.internal.FilteredPayloadMapper
import akka.projection.grpc.internal.proto.EventConsumerServiceClient
import akka.projection.grpc.internal.proto.EventConsumerServiceHandler
+import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import akka.projection.r2dbc.R2dbcProjectionSettings
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.wordspec.AnyWordSpecLike
+import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
@@ -78,12 +84,16 @@ object ProducerPushSpec {
# reducing this to have quicker test, triggers backtracking earlier
backtracking.behind-current-time = 3 seconds
}
+ journal.publish-events-number-of-topics = 2
}
# consumer uses its own h2
test.consumer.r2dbc = $${akka.persistence.r2dbc}
test.consumer.r2dbc.connection-factory = $${akka.persistence.r2dbc.h2}
test.consumer.r2dbc.connection-factory.database = "consumer-db"
+
+ test.consumer.projection = $${akka.projection.r2dbc}
+ test.consumer.projection.use-connection-factory = "test.consumer.r2dbc.connection-factory"
""").withFallback(ConfigFactory.load("persistence.conf")).resolve()
}
class ProducerPushSpec(testContainerConf: TestContainerConf)
@@ -104,29 +114,36 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
override def typedSystem: ActorSystem[_] = system
val entityType = nextEntityType()
+ val streamId = "entity_stream_id"
val producerProjectionId = randomProjectionId()
val consumerProjectionId = randomProjectionId()
+ lazy val consumerProjectionSettings: R2dbcProjectionSettings =
+ R2dbcProjectionSettings(typedSystem.settings.config.getConfig("test.consumer.projection"))
+
val grpcPort = 9588
- def producerSourceProvider =
- EventSourcedProvider.eventsBySlices[String](system, R2dbcReadJournal.Identifier, entityType, 0, 1023)
+ def producerSourceProvider(eps: EventProducerSource) =
+ EventSourcedProvider.eventsBySlices[String](system, R2dbcReadJournal.Identifier, eps.entityType, 0, 1023)
val eventConsumerClient = EventConsumerServiceClient(
GrpcClientSettings.connectToServiceAt("127.0.0.1", grpcPort).withTls(false))
// this projection runs in the producer and pushes events over grpc to the consumer
- def spawnProducerReplicationProjection(): ActorRef[ProjectionBehavior.Command] =
+ def spawnProducerReplicationProjection(eps: EventProducerSource): ActorRef[ProjectionBehavior.Command] =
spawn(
ProjectionBehavior(
- R2dbcProjection.atLeastOnceAsync(
+ R2dbcProjection.atLeastOnceAsync[Offset, EventEnvelope[String]](
producerProjectionId,
settings = None,
- sourceProvider = producerSourceProvider,
- handler = () => new EventPusher[String](eventConsumerClient, Transformation.identity))))
+ sourceProvider = producerSourceProvider(eps),
+ handler = () => new EventPusher(eventConsumerClient, eps))))
- def counsumerSourceProvider =
- EventSourcedProvider.eventsBySlices[String](system, "test.consumer.r2dbc.query", entityType, 0, 1023)
+ def counsumerSourceProvider = {
+ // FIXME how do we auto-wrap with this?
+ new FilteredPayloadMapper(
+ EventSourcedProvider.eventsBySlices[String](system, "test.consumer.r2dbc.query", entityType, 0, 1023))
+ }
// this projection runs in the consumer and just consumes the already projected events
def spawnConsumerProjection(probe: ActorRef[EventEnvelope[String]]) =
@@ -142,6 +159,21 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
Future.successful(Done)
})))
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ lazy val consumerSettings: R2dbcSettings =
+ R2dbcSettings(typedSystem.settings.config.getConfig("test.consumer.r2dbc"))
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from ${consumerSettings.journalTableWithSchema}")),
+ 10.seconds)
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from ${consumerProjectionSettings.timestampOffsetTableWithSchema}")),
+ 10.seconds)
+ // FIXME clean up consumer tables!
+ }
+
"Producer pushed events" should {
"show up on consumer side" in {
@@ -162,7 +194,19 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
// FIXME higher level API for the producer side of this?
// FIXME producer filters
- spawnProducerReplicationProjection()
+ val veggies = Set("cucumber")
+ // FIXME features provided by EventProducerSource may not be overlapping enough that we need it here
+ val eps =
+ EventProducerSource[String](
+ entityType,
+ streamId,
+ Transformation.identity,
+ EventProducerSettings(system),
+ producerFilter = envelope =>
+ // no veggies allowed
+ !veggies(envelope.event))
+
+ spawnProducerReplicationProjection(eps)
// local "regular" projections consume the projected events
val consumerProbe = createTestProbe[EventEnvelope[String]]()
@@ -174,11 +218,17 @@ class ProducerPushSpec(testContainerConf: TestContainerConf)
val pid = nextPid(entityType)
// running this directly, as in producing system (even though they share actor system)
// written in its own db, replicated over grpc to the consumer db.
- val entity = spawn(TestEntity(pid))
- entity ! TestEntity.Persist("bananas")
+ val entity1 = spawn(TestEntity(pid))
+ entity1 ! TestEntity.Persist("bananas")
+ entity1 ! TestEntity.Persist("cucumber") // producer filter - never seen in consumer
+ entity1 ! TestEntity.Persist("mangos")
// event projected into consumer journal and shows up in local projection
consumerProbe.receiveMessage(10.seconds).event should be("bananas")
+
+ // Note: filtered does not show up in projection at all
+
+ consumerProbe.receiveMessage().event should be("mangos")
}
}
diff --git a/akka-projection-grpc/src/main/resources/reference.conf b/akka-projection-grpc/src/main/resources/reference.conf
index f5aca4979..75baae816 100644
--- a/akka-projection-grpc/src/main/resources/reference.conf
+++ b/akka-projection-grpc/src/main/resources/reference.conf
@@ -53,6 +53,7 @@ akka {
serialization-bindings {
"akka.projection.grpc.internal.DdataConsumerFilterStore$State" = akka-projection-grpc-consumer
"akka.projection.grpc.internal.DdataConsumerFilterStore$ConsumerFilterKey" = akka-projection-grpc-consumer
+ "akka.projection.grpc.internal.FilteredPayload$" = akka-projection-grpc-consumer
}
}
}
diff --git a/akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala b/akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
index 5904ef08e..2061d6346 100644
--- a/akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
+++ b/akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
@@ -7,6 +7,7 @@ package akka.persistence
import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
+import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
@@ -35,83 +36,89 @@ private[akka] object EventWriter {
def apply(journalPluginId: String): Behavior[Command] =
Behaviors
- .setup[AnyRef] { context =>
- val writerUuid = UUID.randomUUID().toString
- val journal = Persistence(context.system).journalFor(journalPluginId)
- context.log.debug("Event writer for journal [{}] starting up", journalPluginId)
-
- val waitingForResponse = new util.HashMap[(String, Long), ActorRef[StatusReply[Done]]]()
-
- Behaviors.receiveMessage {
- case Write(persistenceId, sequenceNumber, event, metadata, replyTo) =>
- // FIXME trace or remove
- context.log.debug("Writing event persistence id [{}], sequence nr [{}]", persistenceId, sequenceNumber)
- val repr = PersistentRepr(
- event,
- persistenceId = persistenceId,
- sequenceNr = sequenceNumber,
- manifest = "", // adapters would be on the producing side, already applied
- writerUuid = writerUuid,
- sender = akka.actor.ActorRef.noSender)
-
- val write = AtomicWrite(metadata match {
- case Some(meta) => repr.withMetadata(meta)
- case _ => repr
- }) :: Nil
-
- waitingForResponse.put((persistenceId, sequenceNumber), replyTo)
-
- journal ! JournalProtocol.WriteMessages(write, context.self.toClassic, context.self.path.uid)
- Behaviors.same
-
- case JournalProtocol.WriteMessageSuccess(message, _) =>
- val pidSeqnr = (message.persistenceId, message.sequenceNr)
- waitingForResponse.get(pidSeqnr) match {
- case null =>
- context.log.warn2(
- "Got write success reply for event with no waiting request, probably a bug (pid {}, seq nr {})",
- message.persistenceId,
- message.sequenceNr)
- Behaviors.same
- case replyTo =>
- if (context.log.isTraceEnabled)
- context.log.trace2(
- "Successfully wrote event persistence id [{}], sequence nr [{}]",
+ .supervise(Behaviors
+ .setup[AnyRef] { context =>
+ val writerUuid = UUID.randomUUID().toString
+ val journal = Persistence(context.system).journalFor(journalPluginId)
+ context.log.debug("Event writer for journal [{}] starting up", journalPluginId)
+
+ val waitingForResponse = new util.HashMap[(String, Long), ActorRef[StatusReply[Done]]]()
+
+ Behaviors.receiveMessage {
+ case Write(persistenceId, sequenceNumber, event, metadata, replyTo) =>
+ if (context.log.isTraceEnabled)
+ context.log.traceN(
+ "Writing event persistence id [{}], sequence nr [{}], payload {}",
+ persistenceId,
+ sequenceNumber,
+ event)
+ val repr = PersistentRepr(
+ event,
+ persistenceId = persistenceId,
+ sequenceNr = sequenceNumber,
+ manifest = "", // adapters would be on the producing side, already applied
+ writerUuid = writerUuid,
+ sender = akka.actor.ActorRef.noSender)
+
+ val write = AtomicWrite(metadata match {
+ case Some(meta) => repr.withMetadata(meta)
+ case _ => repr
+ }) :: Nil
+
+ waitingForResponse.put((persistenceId, sequenceNumber), replyTo)
+
+ journal ! JournalProtocol.WriteMessages(write, context.self.toClassic, context.self.path.uid)
+ Behaviors.same
+
+ case JournalProtocol.WriteMessageSuccess(message, _) =>
+ val pidSeqnr = (message.persistenceId, message.sequenceNr)
+ waitingForResponse.get(pidSeqnr) match {
+ case null =>
+ context.log.warn2(
+ "Got write success reply for event with no waiting request, probably a bug (pid {}, seq nr {})",
message.persistenceId,
message.sequenceNr)
- replyTo ! StatusReply.success(Done)
- waitingForResponse.remove(pidSeqnr)
- Behaviors.same
- }
- Behaviors.same
-
- case JournalProtocol.WriteMessageFailure(message, error, _) =>
- val pidSeqnr = (message.persistenceId, message.sequenceNr)
- waitingForResponse.get(pidSeqnr) match {
- case null =>
- context.log.warnN(
- s"Got error reply for event with no waiting request, probably a bug (pid ${message.persistenceId}, seq nr ${message.sequenceNr})",
- error)
- Behaviors.same
- case replyTo =>
- context.log.warnN(
- "Failed writing event persistence id [{}], sequence nr [{}]: {}",
- message.persistenceId,
- message.sequenceNr,
- error.getMessage)
-
- replyTo ! StatusReply.error(error.getMessage)
-
- waitingForResponse.remove(pidSeqnr)
- Behaviors.same
- }
-
- case _ =>
- // ignore all other journal protocol messages
- Behaviors.same
-
- }
- }
+ Behaviors.same
+ case replyTo =>
+ if (context.log.isTraceEnabled)
+ context.log.trace2(
+ "Successfully wrote event persistence id [{}], sequence nr [{}]",
+ message.persistenceId,
+ message.sequenceNr)
+ replyTo ! StatusReply.success(Done)
+ waitingForResponse.remove(pidSeqnr)
+ Behaviors.same
+ }
+ Behaviors.same
+
+ case JournalProtocol.WriteMessageFailure(message, error, _) =>
+ val pidSeqnr = (message.persistenceId, message.sequenceNr)
+ waitingForResponse.get(pidSeqnr) match {
+ case null =>
+ context.log.warnN(
+ s"Got error reply for event with no waiting request, probably a bug (pid ${message.persistenceId}, seq nr ${message.sequenceNr})",
+ error)
+ Behaviors.same
+ case replyTo =>
+ context.log.warnN(
+ "Failed writing event persistence id [{}], sequence nr [{}]: {}",
+ message.persistenceId,
+ message.sequenceNr,
+ error.getMessage)
+
+ replyTo ! StatusReply.error(error.getMessage)
+
+ waitingForResponse.remove(pidSeqnr)
+ Behaviors.same
+ }
+
+ case _ =>
+ // ignore all other journal protocol messages
+ Behaviors.same
+
+ }
+ })
+ .onFailure[Exception](SupervisorStrategy.restart)
.narrow[Command]
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerSerializer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerSerializer.scala
index ccc340569..7aab07170 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerSerializer.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerSerializer.scala
@@ -32,6 +32,7 @@ import scalapb.GeneratedMessage
with BaseSerializer {
private val ConsumerFilterStoreStateManifest = "A"
private val ConsumerFilterKeyManifest = "B"
+ private val FilteredPayloadManifest = "C"
private final val CompressionBufferSize = 1024 * 4
@@ -40,6 +41,7 @@ import scalapb.GeneratedMessage
override def manifest(obj: AnyRef): String = obj match {
case _: DdataConsumerFilterStore.State => ConsumerFilterStoreStateManifest
case _: DdataConsumerFilterStore.ConsumerFilterKey => ConsumerFilterKeyManifest
+ case FilteredPayload => FilteredPayloadManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
@@ -47,6 +49,7 @@ import scalapb.GeneratedMessage
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
case state: DdataConsumerFilterStore.State => compress(stateToProto(state))
case key: DdataConsumerFilterStore.ConsumerFilterKey => replicatedDataSerializer.keyIdToBinary(key.id)
+ case FilteredPayload => Array.empty[Byte]
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}
@@ -55,6 +58,7 @@ import scalapb.GeneratedMessage
case ConsumerFilterStoreStateManifest => stateFromBinary(decompress(bytes))
case ConsumerFilterKeyManifest =>
DdataConsumerFilterStore.ConsumerFilterKey(replicatedDataSerializer.keyIdFromBinary(bytes))
+ case FilteredPayloadManifest => FilteredPayload
case _ =>
throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
index 541e8ac39..07edfc0aa 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
@@ -49,6 +49,7 @@ private[akka] final class EventConsumerServiceImpl(
eventWriter: ActorRef[EventWriter.Command],
persistenceIdTransformer: String => String)(implicit system: ActorSystem[_])
extends EventConsumerService {
+
private val logger = LoggerFactory.getLogger(classOf[EventConsumerServiceImpl])
private val protoAnySerialization = new ProtoAnySerialization(system)
@@ -59,14 +60,25 @@ private[akka] final class EventConsumerServiceImpl(
val persistenceId = persistenceIdTransformer(envelope.persistenceId)
// FIXME can we skip the ask for each event?
- eventWriter.askWithStatus[Done](replyTo =>
- EventWriter.Write(persistenceId, envelope.sequenceNr, envelope.event, envelope.eventMetadata, replyTo))
+ eventWriter.askWithStatus[Done](
+ replyTo =>
+ EventWriter.Write(
+ persistenceId,
+ envelope.sequenceNr,
+ // FIXME how to deal with filtered - can't be null, should we have a marker filtered payload?
+ envelope.eventOption.getOrElse(FilteredPayload),
+ envelope.eventMetadata,
+ replyTo))
}
override def consumeEvent(in: Event): Future[Empty] = {
// FIXME do we need to make sure events for the same pid are ordered? Should be single writer per pid but?
val envelope = ProtobufProtocolConversions.eventToEnvelope[Any](in, protoAnySerialization)
- logger.trace("Saw event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId)
+ logger.trace(
+ "Saw event [{}] for pid [{}]{}",
+ envelope.sequenceNr,
+ envelope.persistenceId,
+ if (envelope.filtered) " filtered" else "")
writeEventToJournal(envelope).map(_ => Empty.defaultInstance)(ExecutionContexts.parasitic)
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala
index 9ef1168e7..2e0fbe163 100644
--- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala
@@ -8,8 +8,9 @@ import akka.Done
import akka.actor.typed.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
+import akka.projection.grpc.internal.ProtobufProtocolConversions.offsetToProtoOffset
import akka.projection.grpc.internal.proto.EventConsumerServiceClient
-import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
import akka.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
@@ -17,19 +18,12 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.control.NonFatal
-/**
- * INTERNAL API
- */
-private[akka] object EventPusher {
- private val FutureDone = Future.successful(Done)
-}
-
/**
* INTERNAL API
*
* gRPC push protocol handler for the producing side
*/
-private[akka] class EventPusher[Event](client: EventConsumerServiceClient, transformation: Transformation)(
+private[akka] class EventPusher[Event](client: EventConsumerServiceClient, eps: EventProducerSource)(
implicit system: ActorSystem[_])
extends Handler[EventEnvelope[Event]] {
import akka.projection.grpc.internal.ProtobufProtocolConversions.transformAndEncodeEvent
@@ -39,17 +33,49 @@ private[akka] class EventPusher[Event](client: EventConsumerServiceClient, trans
private val protoAnySerialization = new ProtoAnySerialization(system)
override def process(envelope: EventEnvelope[Event]): Future[Done] = {
- transformAndEncodeEvent(transformation, envelope, protoAnySerialization).flatMap {
- case None => EventPusher.FutureDone
- case Some(event) =>
- logger.debug("Pushing event for pid [{}], seq nr [{}]", envelope.persistenceId, envelope.sequenceNr)
- client.consumeEvent(event).map(_ => Done)(ExecutionContexts.parasitic).recover {
- case NonFatal(ex) =>
- throw new RuntimeException(
- s"Error pushing event with pid [${envelope.persistenceId}] and sequence nr [${envelope.sequenceNr}] to consumer",
- ex)
- }
- }
+ val encodedEvent: Future[Option[proto.Event]] =
+ if (eps.producerFilter(envelope.asInstanceOf[EventEnvelope[Any]])) {
+ if (logger.isTraceEnabled())
+ logger.trace(
+ "Pushing event persistence id [{}], sequence number [{}]",
+ envelope.persistenceId,
+ envelope.sequenceNr)
+
+ transformAndEncodeEvent(eps.transformation, envelope, protoAnySerialization)
+ } else {
+ if (logger.isTraceEnabled())
+ logger.trace(
+ "Filtering event persistence id [{}], sequence number [{}]",
+ envelope.persistenceId,
+ envelope.sequenceNr)
+
+ Future.successful(None)
+ }
+
+ encodedEvent
+ .flatMap {
+ case Some(protoEvent) =>
+ client
+ .consumeEvent(protoEvent)
+ case None =>
+ // Filtered or transformed to None, we still need to push a placeholder to not get seqnr gaps on the receiving side
+ client.consumeEvent(proto.Event(
+ persistenceId = envelope.persistenceId,
+ seqNr = envelope.sequenceNr,
+ slice = envelope.slice,
+ offset = offsetToProtoOffset(envelope.offset),
+ payload = None,
+ tags = Seq.empty
+ // FIXME do we still need to pass along metadata?
+ ))
+ }
+ .map(_ => Done)(ExecutionContexts.parasitic)
+ .recover {
+ case NonFatal(ex) =>
+ throw new RuntimeException(
+ s"Error pushing event with pid [${envelope.persistenceId}] and sequence nr [${envelope.sequenceNr}] to consumer",
+ ex)
+ }
}
}
diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilteredPayload.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilteredPayload.scala
new file mode 100644
index 000000000..6aa25f7ad
--- /dev/null
+++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilteredPayload.scala
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2023 Lightbend Inc.
+ */
+
+package akka.projection.grpc.internal
+
+import akka.NotUsed
+import akka.annotation.InternalApi
+import akka.dispatch.ExecutionContexts
+import akka.persistence.query.typed.EventEnvelope
+import akka.projection.scaladsl.SourceProvider
+import akka.stream.scaladsl.Source
+
+import scala.concurrent.Future
+
+/**
+ * INTERNAL API
+ *
+ * Placeholder object for filtered events when producer event push is used
+ */
+@InternalApi
+private[akka] case object FilteredPayload
+
+/**
+ * INTERNAL API
+ *
+ * Turns envelopes with placeholder events into filtered envelopes on the consuming side of the journal
+ */
+private[akka] final class FilteredPayloadMapper[OffsetType, Event](
+ actual: SourceProvider[OffsetType, EventEnvelope[Event]])
+ extends SourceProvider[OffsetType, EventEnvelope[Event]] {
+ override def source(offset: () => Future[Option[OffsetType]]): Future[Source[EventEnvelope[Event], NotUsed]] =
+ actual
+ .source(offset)
+ .map(source =>
+ source.map { envelope =>
+ if (envelope.event.asInstanceOf[AnyRef].ne(FilteredPayload)) envelope
+ else
+ new EventEnvelope[Event](
+ persistenceId = envelope.persistenceId,
+ offset = envelope.offset,
+ entityType = envelope.entityType,
+ sequenceNr = envelope.sequenceNr,
+ eventOption = None,
+ timestamp = envelope.timestamp,
+ eventMetadata = None,
+ slice = envelope.slice,
+ filtered = true,
+ source = envelope.source,
+ tags = envelope.tags)
+ })(ExecutionContexts.parasitic)
+
+ override def extractOffset(envelope: EventEnvelope[Event]): OffsetType = actual.extractOffset(envelope)
+
+ override def extractCreationTime(envelope: EventEnvelope[Event]): Long = actual.extractCreationTime(envelope)
+}