-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Write change event of DurableState to event journal #485
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks promising
slice, | ||
entityType, | ||
persistenceId, | ||
revision, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One difficult thing is that if change events are added to an existing durable state that had stored state before enabling change events (revision > 1) the journal will be missing sequence numbers for those earlier revisions. Maybe not a big deal, but I think the projection seqNr validation will reject such events when it doesn't have the previous offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we just disallow that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that we would read latest sequence number from journal and thereby check that there is no gap? That has much overhead and adding caching and such is not trivial (as in the EventWriter).
I'm thinking that we might need something to allow starting from later offset without requiring previous for other reasons. E.g. came up in a question about migration from jdbc, or as soon as you have started to delete events and later add a new projection. I'm not sure if it would be some kind of relaxed offset validation or some kind of offset population job that can be run as a migration task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was more thinking disallow as in saying that won't work in docs, not necessarily any runtime checks.
f3b0c8f
to
405c448
Compare
I think this should be ready for final review now. Will require an Akka release before merging. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good but I wonder about eager dependency on journal.
private[r2dbc] class PostgresDurableStateDao( | ||
settings: R2dbcSettings, | ||
connectionFactory: ConnectionFactory, | ||
journalDao: JournalDao)(implicit ec: ExecutionContext, system: ActorSystem[_]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be worth making the journal-dao creation/dependency more lazy, for use cases where only using "regular" durable state without the change events, this probably means picking up/requiring journal config and maybe even more runtime stuff?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two options:
- reuse the same instance of the JournalDao inside the PostgresDialect
- pass in the dialect as parmeter here so that it can create the JournalDao on demand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we create a separate instance when constructing the durable state dao, can we instead pass a factory lambda and have the journalDao as a lazy field initialized on first use (if that ever happens)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That factory lambda is pretty much the same as passing in the dialect, which has the factory method. Ok, I'll change to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed in b0b535d
store.upsertObject(persistenceId, revision = 2L, updatedValue, tag, s"Changed to $updatedValue").futureValue | ||
|
||
// simulate an update by a different node that didn't see the first one: | ||
val updatedValue2 = "Genuine and Sincere in all Communications" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great values!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM (once we released upstream Akka)
@johanandren I had missed the h2 dialect override and made a mistake of the "in same transaction". Can you take a look at the changes in fbe67e0 before we merge. |
Early draft to show where the idea is going.
TODO: