Skip to content

Commit

Permalink
Add DeferPublishUncommitted to message.Publisher
Browse files Browse the repository at this point in the history
This is an experimental API to allow for publishing a message where the
content is not known until after the acknowledgement intents are needed.
This is a low level API to allow consumers a little more flexibility
when storing checkpoints in external systems, as Flow does for
materializations.
  • Loading branch information
psFried committed Apr 22, 2022
1 parent 443ab90 commit 8584912
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 0 deletions.
85 changes: 85 additions & 0 deletions message/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,91 @@ func (p *Publisher) PublishUncommitted(mapping MappingFunc, msg Message) (*clien
return aa, nil
}

// PendingPublish is returned from DeferPublishUncommitted, and allows appending a single message
// that had previously been sequenced.
//
// **This is a new and unstable API, that is subject to breaking changes.**
type PendingPublish struct {
publisher *Publisher
journal pb.Journal
contentType string
uuid UUID
}

// Resolve completes a PendingPublish by appending the finalized content of a message that had
// previously been sequenced. See DeferPublishUncommitted docs for more.
//
// **This is a new and unstable API, that is subject to breaking changes.**
func (pf *PendingPublish) Resolve(msg Message) error {
if pf.publisher == nil {
// Sanity check for if Resolve has already been called, or if PendingPublish is zero-valued
// due to SequenceFutureMessage having returned an error.
panic("Pending publish has already been resolved")
}
if v, ok := msg.(Validator); ok {
if err := v.Validate(); err != nil {
return err
}
}
msg.SetUUID(pf.uuid)

var framing, err = FramingByContentType(pf.contentType)
if err != nil {
return err
}

var aa = pf.publisher.ajc.StartAppend(pb.AppendRequest{Journal: pf.journal}, nil)
aa.Require(framing.Marshal(msg, aa.Writer()))
err = aa.Release()
pf.publisher = nil // so that we can sanity check that Resolve isn't called twice
return err
}

// DeferPublishUncommitted is used to sequence a message that will be published at some future
// point, but before the end of the transaction. It returns a PendingPublish, which can be resolved
// by passing it the actual message to be published. This is used in situations where you need to
// transactionally publish a message when you don't have the content of that message until after the
// ack intents are built. This is an advanced, low level api, and care must be taken to use it
// correctly to avoid corruption of journal content.
//
// The journal and contentType must be known up front, and the acknowledgement Message must also be
// provided by the caller. It's up to the caller to ensure that these things are correct and
// consistent.
//
// The returned PendingPublish does not need to ever be resolved, and can be dropped with no harm
// done. If Resolve is called, then it must be called _before_ the acknowledgements are written.
// Otherwise the resolved message will be ignored by ReadCommitted consumers. Also note that the
// PendingPublish is not safe to Resolve concurrently with other uses of a Publisher.
//
// **This is a new and unstable API, that is subject to breaking changes.**
func (p *Publisher) DeferPublishUncommitted(journal pb.Journal, contentType string, ack Message) (fut PendingPublish, err error) {
if p.autoUpdate {
p.clock.Update(time.Now())
}

var framing Framing
if framing, err = FramingByContentType(contentType); err != nil {
return
}

var uuid = BuildUUID(p.producer, p.clock.Tick(), Flag_CONTINUE_TXN)
// Is this the first publish to this journal since our last commit?
if _, ok := p.intentIdx[journal]; !ok {
p.intentIdx[journal] = len(p.intents)
p.intents = append(p.intents, AckIntent{
Journal: journal,
msg: ack,
framing: framing,
})
}
return PendingPublish{
publisher: p,
journal: journal,
contentType: contentType,
uuid: uuid,
}, nil
}

// BuildAckIntents returns the []AckIntents which acknowledge all pending
// Messages published since its last invocation. It's the caller's job to
// actually append the intents to their respective journals, and only *after*
Expand Down
119 changes: 119 additions & 0 deletions message/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,125 @@ func TestIntegrationOfPublisherWithSequencerAndReader(t *testing.T) {
require.NoError(t, bk.Tasks.Wait())
}

func TestDeferPublishUncommitted(t *testing.T) {
var etcd = etcdtest.TestClient()
defer etcdtest.Cleanup()

var (
clock Clock
ctx = context.Background()
spec = newTestMsgSpec("a/journal")
bk = brokertest.NewBroker(t, etcd, "local", "broker")
ajc = client.NewAppendService(ctx, bk.Client())
)
brokertest.CreateJournals(t, bk, spec)

// Start a long-lived RetryReader of |spec|.
var rr = client.NewRetryReader(ctx, bk.Client(), pb.ReadRequest{
Journal: spec.Name,
Block: true,
})
var r = NewReadUncommittedIter(rr, newTestMsg)

var seq = NewSequencer(nil, nil, 5)

var seqPump = func() (out []testMsg) {
var env, err = r.Next()
require.NoError(t, err)

if seq.QueueUncommitted(env) == QueueAckCommitReplay {
var journal, from, to = seq.ReplayRange()
var rr = client.NewRetryReader(ctx, bk.Client(), pb.ReadRequest{
Journal: journal,
Offset: from,
EndOffset: to,
})
seq.StartReplay(NewReadUncommittedIter(rr, newTestMsg))
}
for {
if err := seq.Step(); err == io.EOF {
return
}
require.NoError(t, err)
out = append(out, *seq.Dequeued.Message.(*testMsg))
}
}

var mapping = func(Mappable) (pb.Journal, string, error) {
return spec.Name, labels.ContentType_JSONLines, nil
}
var pub = NewPublisher(ajc, &clock)

// Happy path: An uncommitted message can be written before a deferred one, and should get
// sequenced normally with respect to the deferred message, since the deferred publish is
// started after.
var _, err = pub.PublishUncommitted(mapping, &testMsg{Str: "one"})
require.NoError(t, err)
require.Equal(t, []testMsg(nil), seqPump())

fut, err := pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg))
require.NoError(t, err)

intents, err := pub.BuildAckIntents()
require.NoError(t, err)

require.NoError(t, fut.Resolve(&testMsg{Str: "two"}))
require.Equal(t, []testMsg(nil), seqPump())

writeIntents(t, ajc, intents)

var actual = seqPump()
require.Equal(t, 3, len(actual))
require.Equal(t, "one", actual[0].Str)
require.Equal(t, "two", actual[1].Str)
require.Equal(t, "", actual[2].Str)

// Sad path cases:
// The deferred publish message will not be seen because it sequences before "three"
fut, err = pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg))
require.NoError(t, err)

_, err = pub.PublishUncommitted(mapping, &testMsg{Str: "three"})
require.NoError(t, err)
require.Equal(t, []testMsg(nil), seqPump())
intents, err = pub.BuildAckIntents()
require.NoError(t, err)
require.NoError(t, fut.Resolve(&testMsg{Str: "wont see four"}))
require.Equal(t, []testMsg(nil), seqPump())

writeIntents(t, ajc, intents)
actual = seqPump()
require.Equal(t, 2, len(actual))
require.Equal(t, "three", actual[0].Str)
require.Equal(t, "", actual[1].Str)

// The deferred publish isn't resolved until after the acks were written, so will not be seen.
_, err = pub.PublishUncommitted(mapping, &testMsg{Str: "five"})
require.NoError(t, err)
require.Equal(t, []testMsg(nil), seqPump())

fut, err = pub.DeferPublishUncommitted(spec.Name, labels.ContentType_JSONLines, new(testMsg))
require.NoError(t, err)

intents, err = pub.BuildAckIntents()
require.NoError(t, err)
writeIntents(t, ajc, intents)

actual = seqPump()
require.Equal(t, 2, len(actual))
require.Equal(t, "five", actual[0].Str)
require.Equal(t, "", actual[1].Str)

require.NoError(t, fut.Resolve(&testMsg{Str: "wont see six"}))
require.Equal(t, []testMsg(nil), seqPump())

_, err = pub.PublishCommitted(mapping, &testMsg{Str: "seven"})
require.NoError(t, err)
actual = seqPump()
require.Equal(t, 1, len(actual))
require.Equal(t, "seven", actual[0].Str)
}

func readAllMsgs(t require.TestingT, bk *brokertest.Broker, spec *pb.JournalSpec) (out []testMsg) {
var rr = client.NewRetryReader(context.Background(), bk.Client(), pb.ReadRequest{Journal: spec.Name})
var r = NewReadUncommittedIter(rr, newTestMsg)
Expand Down

0 comments on commit 8584912

Please sign in to comment.