-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Producer initiated pushing of events over gRPC #932
Conversation
akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_consumer.proto
Outdated
Show resolved
Hide resolved
...-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
Outdated
Show resolved
Hide resolved
45e023f
to
52d8b0c
Compare
akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilteredPayload.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/producer/ProducerPushSpec.scala
Outdated
Show resolved
Hide resolved
oneof message { | ||
// always sent first | ||
ConsumerEventInit init = 1; | ||
Event event = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in event_producer.proto
we have Event
and FilteredEvent
. Should we use FilteredEvent
here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would make the protocol more clear yeah, I'll do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed in 80d67c0
} | ||
.via(ActorFlow.askWithStatus(8)(eventWriter) { | ||
(in: proto.Event, replyTo: ActorRef[StatusReply[EventWriter.WriteAck]]) => | ||
// FIXME would we want request metadata/producer ip/entire envelope in to persistence id transformer? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, one case could be that based on the origin of the event you want to adjust the persistenceId to include that origin (for example making it more unique)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another thing that could be good to amend before writing the event would be the tags, e.g. adding a tag for the origin, which can then be used in filtering later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could have a transformation function of the Entire EventEnvelope, but we shouldn't allow changing the seqNr
btw, would then probably be nice with some "copy methods" on the EventEnvelope, such as withTags
, withPersistenceId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we could accept a transform but always check that sequence number was not changed. Or more a more limited transform API instead of adding API to let users actually transform the entire envelope, Transform.addTags()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a try at a transform API but didn't include the origin as a parameter yet: fdb350e
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And now the transform can be created per origin/event stream based on origin id and request metadata fda1176
akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilteredPayload.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilteredPayload.scala
Outdated
Show resolved
Hide resolved
// FIXME config for parallelism, and perPartition (aligned with event writer batch config) | ||
.mapAsyncPartitioned(1000, 20)(_.persistenceId) { (in, _) => | ||
// FIXME would we want request metadata/producer ip/entire envelope in to persistence id transformer? | ||
val envelope = ProtobufProtocolConversions.eventToEnvelope[Any](in, protoAnySerialization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good if we can skip the deserialization-serialization roundtrip here, and that may impact what kind of transformer we want to support. If the transformer is not changing the payload we can avoid the serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just looked briefly at this again, let me know when it's time for a deeper look.
|
||
} | ||
}) | ||
.onFailure[Exception](SupervisorStrategy.restart) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have backoff restart, because if it starts failing we don't want to overload the db?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly not quite sure how this would crash, other than some bug in the batching logic, errors persisting events are fed back to the sender and shouldn't crash the writer itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we are not failing the writer actor when we there are db errors. The CircuitBreaker in AsyncWriteJournal
will protect the db from overload when it's already in trouble. Ok, we can leave it as plain restart
.
akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/persistence/EventWriter.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConsumerSerializer.scala
Outdated
Show resolved
Hide resolved
@patriknw the largest task that is left is to come up with proper user API, and once that is in place, break out the sample I started on to be able to test it properly, and some docs. I think you can review the internal bits and pieces a bit more if you have time. Possible small-to-medium things left, not sure if we need all right now or can iterate:
|
b426abe
to
4f5e427
Compare
@patriknw WDYT is this producer API simple enough while being flexible enough that you can run it in a non-clustered actor system to just push events for one or a few entities, but ofc also in a cluster with SDP to push a larger number of entities. Or do we need something higher level like we did with the |
akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/EventConsumer.scala
Outdated
Show resolved
Hide resolved
...-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
Outdated
Show resolved
Hide resolved
...-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
Outdated
Show resolved
Hide resolved
...-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventConsumerServiceImpl.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusher.scala
Outdated
Show resolved
Hide resolved
...jection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/ActiveEventProducer.scala
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
case JournalProtocol.WriteMessageFailure(message, error, `actorInstanceId`) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How should we handle duplicates, which will result in write failures? We will have duplicates since it's at-least-once.
One idea is to optimistically write, but if it fails we can check if the pid/seqNr exists with EventTimestampQuery
or LoadEventQuery
, or even eventsByPersistenceId
since we are interested in more than one in the batch.
Not great to add a dependency to readJournalPluginId too, so maybe we can do this with the journal protocol?
asyncReadHighestSequenceNr is not exposed as journal protocol message, so if we don't want to change Akka we could use ReplayMessages
to find the highest sequence number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, and the exception is from the plugin, so there is no way to determine that it is a duplicate-write from the failure response. Replay seems like a good trick, because we could do fromSequenceNr = toSequenceNr just for that seqNr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception is from plugin, but we can't see the difference of duplicate error to any other error, at this level.
Some plugins may not fail on duplicates, such as Cassandra, but that is another story.
and I think you know which other seqNrs are pending from the same batch, and it's very likely that they will also fail. Then it's probably easier to use WriteMessagesFailure to trigger these checks. We know that all or none will fail because they are in AtomicWrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EventWriter...
akka-projection-grpc/src/main/scala/akka/persistence/typed/EventWriter.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc-tests/src/it/scala/akka/persistence/typed/EventWriterSpec.scala
Outdated
Show resolved
Hide resolved
akka-projection-grpc-tests/src/it/scala/akka/persistence/typed/EventWriterSpec.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadsl looking good
...grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/EventProducerPushDestination.scala
Outdated
Show resolved
Hide resolved
that can wait until later, probably a rather big task, if we think about all cases where it would be nice |
This needs an Akka patch release with the event writer, but then I think we can merge and iterate on top. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
.registerTagMapper[String](_ => Set("added-tag")) | ||
.registerPayloadMapper[String, String](env => env.eventOption.map(_.toUpperCase)) | ||
} | ||
.withConsumerFilters(Vector(ConsumerFilter.ExcludeEntityIds(Set(consumerFilterExcludedPid.id)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good
Superseedes #920