Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DeferPublishUncommitted to message.Publisher #319

Merged
merged 1 commit into from
Apr 22, 2022
Merged

Conversation

psFried
Copy link
Contributor

@psFried psFried commented Apr 22, 2022

This is an experimental API to allow for publishing a message where the
content is not known until after the acknowledgement intents are needed.
This is a low level API to allow consumers a little more flexibility
when storing checkpoints in external systems, as Flow does for
materializations.


This change is Reviewable

@psFried psFried requested a review from jgraettinger April 22, 2022 14:15
Copy link
Contributor

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r1, all commit messages.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @psFried)


message/publisher.go, line 188 at r1 (raw file):

//
// **This is a new and unstable API, that is subject to breaking changes.**
func (p *Publisher) DeferPublishUncommitted(journal pb.Journal, contentType string, ack Message) (fut PendingPublish, err error) {

We're already required to pass in a Message for its ack-intents behavior. Should we also use it to determine journal and contentType as the regular PublishUncommitted does ? It would be up to the caller to fill out "enough" of the Message that that routing can be done, even if the final content of the Message isn't ready, but that seems fine and in-keeping with intended usages?

It does have further implications on the API though: should PendingPublish then capture the Message to be published, such that Resolve() doesn't require it by-argument ?

Seems like this might be simpler overall, since the caller has to have meaningful prior knowledge of what the Message instance is going to be anyway in order for the ACK intents behavior to work properly. The pattern would essentially be to allocate it and fill out what's known know, then start a deferred publish, then fill out the rest, then resolve.


message/publisher.go, line 204 at r1 (raw file):

		p.intents = append(p.intents, AckIntent{
			Journal: journal,
			msg:     ack,

I recognize this is explicitly documented to be an acknolwedgement, but think this should still probably do aack.NewAcknowledgement(...) as happens in PublishUncommitted.


message/publisher_test.go, line 218 at r1 (raw file):

}

func TestDeferPublishUncommitted(t *testing.T) {

great test!


message/publisher_test.go, line 244 at r1 (raw file):

		require.NoError(t, err)

		if seq.QueueUncommitted(env) == QueueAckCommitReplay {

nit: you can probably increase the ring buffer size and panic in this case unless you're deliberately wanting to exercise ring buffer semantics (which seem unrelated to the unit under test) ?

@psFried
Copy link
Contributor Author

psFried commented Apr 22, 2022

We're already required to pass in a Message for its ack-intents behavior. Should we also use it to determine journal and contentType as the regular PublishUncommitted does ?

The problem that I see with that is that Message doesn't specify any way to mutate it besides setting the uuid, which would make for a really awkward interface for Resolve. How would the caller mutate the message before it's written? Something like:

func (p *PendingPublish) Resolve(finishMessage func(Message) Message) error ...

would mean that the caller has to use a type assertion to get their concrete message type, then mutate it and return it. It's weird, too, because Message is an interface so you wouldn't normally pass *Message to mutate it, but would need to have finishMessage accept it by value and then return a (possibly different) Message. I guess we could pass *Message, but it feels like something that would be frowned upon. WDYT?

This is an experimental API to allow for publishing a message where the
content is not known until after the acknowledgement intents are needed.
This is a low level API to allow consumers a little more flexibility
when storing checkpoints in external systems, as Flow does for
materializations.
Copy link
Contributor Author

@psFried psFried left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 2 files reviewed, 3 unresolved discussions (waiting on @jgraettinger and @psFried)


message/publisher.go, line 188 at r1 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

We're already required to pass in a Message for its ack-intents behavior. Should we also use it to determine journal and contentType as the regular PublishUncommitted does ? It would be up to the caller to fill out "enough" of the Message that that routing can be done, even if the final content of the Message isn't ready, but that seems fine and in-keeping with intended usages?

It does have further implications on the API though: should PendingPublish then capture the Message to be published, such that Resolve() doesn't require it by-argument ?

Seems like this might be simpler overall, since the caller has to have meaningful prior knowledge of what the Message instance is going to be anyway in order for the ACK intents behavior to work properly. The pattern would essentially be to allocate it and fill out what's known know, then start a deferred publish, then fill out the rest, then resolve.

Done.


message/publisher_test.go, line 244 at r1 (raw file):

Previously, jgraettinger (Johnny Graettinger) wrote…

nit: you can probably increase the ring buffer size and panic in this case unless you're deliberately wanting to exercise ring buffer semantics (which seem unrelated to the unit under test) ?

good call, that was a bit of copypasta left over

@psFried psFried merged commit fa37e21 into master Apr 22, 2022
@psFried psFried deleted the phil/publish-fut branch April 22, 2022 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants