Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Client.Watch inside Client.Attach #803

Merged
merged 5 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 63 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"path/filepath"
"strings"
"sync/atomic"

"connectrpc.com/connect"
"github.com/rs/xid"
Expand Down Expand Up @@ -72,12 +73,22 @@ var (

// ErrInitializationNotReceived occurs when the first response of the watch stream is not received.
ErrInitializationNotReceived = errors.New("initialization is not received")

// ErrAlreadySubscribed occurs when the client is already subscribed to the document.
ErrAlreadySubscribed = errors.New("already subscribed")
)

// Attachment represents the document attached.
type Attachment struct {
doc *document.Document
docID types.ID

// TODO(krapie): assuming that a client do not open multiple subscriptions for the same document.
isSubscribed atomic.Bool
krapie marked this conversation as resolved.
Show resolved Hide resolved

rch <-chan WatchResponse
watchCtx context.Context
closeWatchStream context.CancelFunc
}

// Client is a normal client that can communicate with the server.
Expand Down Expand Up @@ -322,6 +333,15 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
docID: types.ID(res.Msg.DocumentId),
}

watchCtx, cancelFunc := context.WithCancel(ctx)
c.attachments[doc.Key()].watchCtx = watchCtx
c.attachments[doc.Key()].closeWatchStream = cancelFunc

err = c.runWatchLoop(watchCtx, doc)
if err != nil {
return err
}

return nil
}

Expand All @@ -346,6 +366,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ...
return ErrDocumentNotAttached
}

attachment.closeWatchStream()

if err := doc.Update(func(root *json.Object, p *presence.Presence) error {
p.Clear()
return nil
Expand Down Expand Up @@ -406,21 +428,37 @@ func (c *Client) Sync(ctx context.Context, options ...SyncOptions) error {
return nil
}

// Watch subscribes to events on a given documentIDs.
// Subscribe subscribes to events on a given document.
func (c *Client) Subscribe(
doc *document.Document,
) (<-chan WatchResponse, context.CancelFunc, error) {
attachment, ok := c.attachments[doc.Key()]
if !ok {
return nil, nil, ErrDocumentNotAttached
}

if attachment.isSubscribed.Load() {
return nil, nil, ErrAlreadySubscribed
}
attachment.isSubscribed.Store(true)
krapie marked this conversation as resolved.
Show resolved Hide resolved

return attachment.rch, attachment.closeWatchStream, nil
}

// runWatchLoop subscribes to events on a given documentIDs.
// If an error occurs before stream initialization, the second response, error,
// is returned. If the context "ctx" is canceled or timed out, returned channel
// is returned. If the context "watchCtx" is canceled or timed out, returned channel
// is closed, and "WatchResponse" from this closed channel has zero events and
// nil "Err()".
func (c *Client) Watch(
func (c *Client) runWatchLoop(
ctx context.Context,
doc *document.Document,
) (<-chan WatchResponse, error) {
) error {
attachment, ok := c.attachments[doc.Key()]
if !ok {
return nil, ErrDocumentNotAttached
return ErrDocumentNotAttached
}

rch := make(chan WatchResponse)
stream, err := c.client.WatchDocument(
ctx,
withShardKey(connect.NewRequest(&api.WatchDocumentRequest{
Expand All @@ -429,40 +467,52 @@ func (c *Client) Watch(
},
), c.options.APIKey, doc.Key().String()))
if err != nil {
return nil, err
return err
}

// NOTE(hackerwins): We need to receive the first response to initialize
// the watch stream. Watch should be blocked until the first response is
// the watch stream. runWatchLoop should be blocked until the first response is
// received.
if !stream.Receive() {
return nil, ErrInitializationNotReceived
return ErrInitializationNotReceived
}
if _, err := handleResponse(stream.Msg(), doc); err != nil {
return nil, err
return err
}
if err = stream.Err(); err != nil {
return nil, err
return err
}

rch := make(chan WatchResponse)
attachment.rch = rch

go func() {
for stream.Receive() {
pbResp := stream.Msg()
resp, err := handleResponse(pbResp, doc)
if err != nil {
rch <- WatchResponse{Err: err}
ctx.Done()
close(rch)
return
}
if resp == nil {
if resp == nil || !attachment.isSubscribed.Load() {
continue
}

rch <- *resp
}
if err = stream.Err(); err != nil {
attachment.isSubscribed.Store(false)
rch <- WatchResponse{Err: err}
ctx.Done()
close(rch)

// If watch stream is disconnected, we re-establish the watch stream.
err = c.runWatchLoop(ctx, doc)
if err != nil {
return
}
return
}
}()
Expand Down Expand Up @@ -504,7 +554,7 @@ func (c *Client) Watch(
}
}()

return rch, nil
return nil
}

func handleResponse(
Expand Down
4 changes: 2 additions & 2 deletions test/bench/grpc_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ func BenchmarkRPC(b *testing.B) {
})
assert.NoError(b, err)

rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(b, err)
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(b, err)

done1 := make(chan bool)
Expand Down
5 changes: 2 additions & 3 deletions test/integration/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ func TestAdmin(t *testing.T) {

t.Run("document event propagation on removal test", func(t *testing.T) {
ctx := context.Background()
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()

// 01. c1 attaches and watches d1.
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
wg := sync.WaitGroup{}
wg.Add(1)
rch, err := c1.Watch(watchCtx, d1)
rch, cancel, err := c1.Subscribe(d1)
defer cancel()
assert.NoError(t, err)
go func() {
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion test/integration/auth_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestProjectAuthWebhook(t *testing.T) {
err = cli.Attach(ctx, doc)
assert.Equal(t, connect.CodeUnauthenticated, connect.CodeOf(err))

_, err = cli.Watch(ctx, doc)
_, _, err = cli.Subscribe(doc)
assert.Equal(t, client.ErrDocumentNotAttached, err)
})
}
Expand Down
42 changes: 19 additions & 23 deletions test/integration/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestDocument(t *testing.T) {
ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))

_, err := c1.Watch(ctx, d1)
_, _, err := c1.Subscribe(d1)
assert.ErrorIs(t, err, client.ErrDocumentNotAttached)

err = c1.Attach(ctx, d1)
Expand All @@ -163,8 +163,9 @@ func TestDocument(t *testing.T) {

// 01. cli1 watches doc1.
wg.Add(1)
rch, err := c1.Watch(ctx, d1)
rch, _, err := c1.Subscribe(d1)
assert.NoError(t, err)

go func() {
defer wg.Done()

Expand Down Expand Up @@ -402,23 +403,21 @@ func TestDocument(t *testing.T) {

t.Run("removed document removal with watching test", func(t *testing.T) {
ctx := context.Background()
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()

// 01. c1 creates d1 without attaching.
d1 := document.New(helper.TestDocKey(t))
_, err := c1.Watch(watchCtx, d1)
_, _, err := c1.Subscribe(d1)
assert.ErrorIs(t, err, client.ErrDocumentNotAttached)

// 02. c1 attaches d1 and watches it.
assert.NoError(t, c1.Attach(ctx, d1))
_, err = c1.Watch(watchCtx, d1)
_, _, err = c1.Subscribe(d1)
assert.NoError(t, err)

// 03. c1 removes d1 and watches it.
assert.NoError(t, c1.Remove(ctx, d1))
assert.Equal(t, d1.Status(), document.StatusRemoved)
_, err = c1.Watch(watchCtx, d1)
_, _, err = c1.Subscribe(d1)
assert.ErrorIs(t, err, client.ErrDocumentNotAttached)
})

Expand All @@ -437,19 +436,19 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)
d2.SubscribeBroadcastEvent("mention", handler)

d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
rch3, err := c3.Watch(ctx, d3)
rch3, _, err := c3.Subscribe(d3)
assert.NoError(t, err)
d3.SubscribeBroadcastEvent("mention", handler)

Expand Down Expand Up @@ -499,13 +498,13 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)
d2.SubscribeBroadcastEvent("mention", handler)

Expand Down Expand Up @@ -557,14 +556,14 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

// c2 doesn't subscribe to the "mention" broadcast event.
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)

// The unsubscriber c2 broadcasts the "mention" event.
Expand Down Expand Up @@ -603,7 +602,7 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
_, err := c1.Watch(ctx, d1)
_, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", nil)

Expand All @@ -626,13 +625,13 @@ func TestDocument(t *testing.T) {

d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
rch1, err := c1.Watch(ctx, d1)
rch1, _, err := c1.Subscribe(d1)
assert.NoError(t, err)
d1.SubscribeBroadcastEvent("mention", handler)

d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
rch2, err := c2.Watch(ctx, d2)
rch2, _, err := c2.Subscribe(d2)
assert.NoError(t, err)
d2.SubscribeBroadcastEvent("mention", handler)

Expand Down Expand Up @@ -711,9 +710,8 @@ func TestDocumentWithProjects(t *testing.T) {
d1 := document.New(helper.TestDocKey(t))
err = c1.Attach(ctx, d1)
assert.NoError(t, err)
watch1Ctx, cancel1 := context.WithCancel(ctx)
rch, cancel1, err := c1.Subscribe(d1)
defer cancel1()
rch, err := c1.Watch(watch1Ctx, d1)
assert.NoError(t, err)

go func() {
Expand Down Expand Up @@ -750,8 +748,7 @@ func TestDocumentWithProjects(t *testing.T) {
})
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))
watch2Ctx, cancel2 := context.WithCancel(ctx)
_, err = c2.Watch(watch2Ctx, d2)
_, cancel2, err := c2.Subscribe(d2)
assert.NoError(t, err)

// c2 updates the document, so c1 receives a documents changed event.
Expand All @@ -764,8 +761,7 @@ func TestDocumentWithProjects(t *testing.T) {
// d3 is in another project, so c1 and c2 should not receive events.
d3 := document.New(helper.TestDocKey(t))
assert.NoError(t, c3.Attach(ctx, d3))
watch3Ctx, cancel3 := context.WithCancel(ctx)
_, err = c3.Watch(watch3Ctx, d3)
_, cancel3, err := c3.Subscribe(d3)
assert.NoError(t, err)
assert.NoError(t, d3.Update(func(root *json.Object, p *presence.Presence) error {
root.SetString("key3", "value3")
Expand Down
Loading
Loading