Skip to content

Commit

Permalink
manage watch stream of attachment in go routine
Browse files Browse the repository at this point in the history
  • Loading branch information
karockai committed Sep 19, 2023
1 parent 93ee546 commit cc29fac
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 209 deletions.
148 changes: 91 additions & 57 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/presence"
"github.com/yorkie-team/yorkie/pkg/document/time"
gotime "time"
)

type status int
Expand Down Expand Up @@ -69,8 +70,9 @@ var (

// Attachment represents the document attached.
type Attachment struct {
doc *document.Document
docID types.ID
doc *document.Document
docID types.ID
watchStream api.YorkieService_WatchDocumentClient
}

// Client is a normal client that can communicate with the server.
Expand Down Expand Up @@ -238,6 +240,12 @@ func (c *Client) Deactivate(ctx context.Context) error {
return err
}

for _, val := range c.attachments {
if err := val.cancelWatchStream(); err != nil {
return err
}
}

c.status = deactivated

return nil
Expand Down Expand Up @@ -376,9 +384,8 @@ func (c *Client) Detach(ctx context.Context, doc *document.Document, options ...
if doc.Status() != document.StatusRemoved {
doc.SetStatus(document.StatusDetached)
}
delete(c.attachments, doc.Key())

return nil
return c.detachInternal(doc.Key())
}

// Sync pushes local changes of the attached documents to the server and
Expand Down Expand Up @@ -414,18 +421,6 @@ func (c *Client) watch(
return nil, ErrDocumentNotAttached
}

rch := make(chan WatchResponse)
stream, err := c.client.WatchDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.WatchDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
},
)
if err != nil {
return nil, err
}

handleResponse := func(pbResp *api.WatchDocumentResponse) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
Expand Down Expand Up @@ -484,58 +479,71 @@ func (c *Client) watch(
return nil, ErrUnsupportedWatchResponseType
}

pbResp, err := stream.Recv()
if err != nil {
return nil, err
}
if _, err := handleResponse(pbResp); err != nil {
return nil, err
rch := make(chan WatchResponse, 3)

createStream := func() (api.YorkieService_WatchDocumentClient, error) {
result, err := c.client.WatchDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.WatchDocumentRequest{
ClientId: c.id.Bytes(),
DocumentId: attachment.docID.String(),
},
)
return result, err
}

go func() {
for {
pbResp, err := stream.Recv()
if err != nil {
rch <- WatchResponse{Err: err}
close(rch)
return
}
resp, err := handleResponse(pbResp)
if err != nil {
rch <- WatchResponse{Err: err}
close(rch)
return
}
if resp == nil {
continue
}

rch <- *resp
var err error
attachment.watchStream, err = createStream()
if err != nil {
return
}
}()

// TODO(hackerwins): We need to revise the implementation of the watch
// event handling. Currently, we are using the same channel for both
// document events and watch events. This is not ideal because the
// client cannot distinguish between document events and watch events.
// We'll expose only document events and watch events will be handled
// internally.

// TODO(hackerwins): We should ensure that the goroutine is closed when
// the stream is closed.
go func() {
for {
select {
case <-ctx.Done():
close(rch)
return
case e := <-doc.Events():
t := PresenceChanged
if e.Type == document.WatchedEvent {
t = DocumentWatched
_ = DocumentWatched
} else if e.Type == document.UnwatchedEvent {
t = DocumentUnwatched
_ = attachment.cancelWatchStream()
close(rch)
return
} else if e.Type == document.PresenceChangedEvent {
t := PresenceChanged
rch <- WatchResponse{Type: t, Presences: e.Presences}
}
rch <- WatchResponse{Type: t, Presences: e.Presences}
case <-ctx.Done():
return
continue
default:
if attachment.watchStream == nil {
gotime.Sleep(1 * gotime.Second)
attachment.watchStream, err = createStream()
if err != nil {
return
}
continue
}

pbResp, err := attachment.watchStream.Recv()

if err != nil {
attachment.watchStream = nil
continue
}
resp, err := handleResponse(pbResp)
if err != nil {
rch <- WatchResponse{Err: err}
_ = attachment.cancelWatchStream()
close(rch)
return
}
if resp == nil || resp.Type == DocumentUnwatched {
continue
}

rch <- *resp
}
}
}()
Expand Down Expand Up @@ -606,6 +614,7 @@ func (c *Client) pushPullChanges(ctx context.Context, opt SyncOptions) error {
return err
}
if attachment.doc.Status() == document.StatusRemoved {
_ = attachment.cancelWatchStream()
delete(c.attachments, attachment.doc.Key())
}

Expand Down Expand Up @@ -652,6 +661,31 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {
if doc.Status() == document.StatusRemoved {
delete(c.attachments, doc.Key())
}
_ = attachment.cancelWatchStream()

return nil
}

func (c *Client) detachInternal(docKey key.Key) error {
attachment, ok := c.attachments[docKey]
if !ok {
return nil
}

if err := attachment.cancelWatchStream(); err != nil {
return err
}

delete(c.attachments, docKey)

return nil
}

func (a *Attachment) cancelWatchStream() error {
if a.watchStream != nil {
_ = a.watchStream.CloseSend()
}
a.watchStream = nil

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Document struct {
func New(key key.Key) *Document {
return &Document{
doc: NewInternalDocument(key),
events: make(chan DocEvent, 1),
events: make(chan DocEvent, 3),
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/backend/sync/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewSubscription(subscriber *time.ActorID) *Subscription {
return &Subscription{
id: xid.New().String(),
subscriber: subscriber,
events: make(chan DocEvent, 1),
events: make(chan DocEvent, 3),
}
}

Expand Down
16 changes: 3 additions & 13 deletions server/rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ import (
"context"
"encoding/hex"
"fmt"
"log"
"os"
"testing"
"time"

"github.com/gogo/protobuf/types"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"log"
"os"
"testing"

"github.com/yorkie-team/yorkie/admin"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
Expand Down Expand Up @@ -683,14 +681,6 @@ func TestSDKRPCServerBackend(t *testing.T) {
// check if stream is open
_, err = watchResp.Recv()
assert.NoError(t, err)

// wait for MaxConnectionAge + MaxConnectionAgeGrace
time.Sleep(helper.RPCMaxConnectionAge + helper.RPCMaxConnectionAgeGrace)

// check if stream has closed by server (EOF)
_, err = watchResp.Recv()
assert.Equal(t, codes.Unavailable, status.Code(err))
assert.Contains(t, err.Error(), "EOF")
})
}

Expand Down
73 changes: 0 additions & 73 deletions test/integration/agent_test.go

This file was deleted.

Loading

0 comments on commit cc29fac

Please sign in to comment.