Skip to content

Commit

Permalink
Improve UpdateMetadata API
Browse files Browse the repository at this point in the history
The order of publishing after calling PubSub.UpdateSubscriber in
UpdateMetadata should be synchronized. When requests from the same
client are requested at the same time, the final result can be matched.
  • Loading branch information
dc7303 committed May 5, 2021
1 parent 9d495e5 commit 6c8869d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
77 changes: 77 additions & 0 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// +build integration
// +build !race

/*
* Copyright 2020 The Yorkie Authors. All rights reserved.
Expand All @@ -21,6 +22,7 @@ package integration
import (
"context"
"io"
"strconv"
"sync"
"testing"

Expand Down Expand Up @@ -143,4 +145,79 @@ func TestClient(t *testing.T) {

wg.Wait()
})

t.Run("update metadata at the same time test", func(t *testing.T) {
ctx := context.Background()

clients := createActivatedClients(t, 2)
c1 := clients[0]
c2 := clients[1]
defer func() {
cleanupClients(t, clients)
}()

doc := document.New(helper.Collection, t.Name())
err := c1.Attach(ctx, doc)
assert.NoError(t, err)

err = c2.Attach(ctx, doc)
assert.NoError(t, err)

wg := sync.WaitGroup{}
rch := c1.Watch(ctx, doc)

var expectedChan = make(chan map[string]string)
var actualChan = make(chan map[string]string)
var expected map[string]string
var actual map[string]string

go func() {
for {
select {
case e := <-expectedChan:
expected = e
wg.Done()
case a := <-actualChan:
actual = a
wg.Done()
}
}
}()
go func() {
for {
select {
case <-ctx.Done():
assert.Fail(t, "unexpected ctx done")
case resp := <-rch:
errCode := status.Code(resp.Err)
if resp.Err == io.EOF || errCode == codes.Canceled || errCode == codes.Unavailable {
return
}
assert.NoError(t, resp.Err)

if resp.EventType == types.ClientChangedEvent {
actualChan <- resp.Publisher.Metadata
}
}
}
}()

c2.Watch(ctx, doc)

for i := 0; i < 5; i++ {
wg.Add(2)
go func(idx int) {
metadata := map[string]string{
"idx": strconv.Itoa(idx),
}
err = c2.UpdateMetadata(ctx, metadata)
assert.NoError(t, err)
expectedChan <- metadata
}(i)
}

wg.Wait()

assert.Equal(t, expected, actual)
})
}
20 changes: 19 additions & 1 deletion yorkie/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (s *Server) PushPull(
// corresponding to the client whose metadata is to be changed,
// and notifies other Subscribers of the changed client information.
func (s *Server) UpdateMetadata(
_ context.Context,
ctx context.Context,
req *api.UpdateMetadataRequest,
) (*api.UpdateMetadataResponse, error) {
client, err := converter.FromClient(req.Client)
Expand All @@ -371,6 +371,24 @@ func (s *Server) UpdateMetadata(
}

if len(docKeys) > 0 {
locker, err := s.backend.LockerMap.NewLocker(
ctx,
sync.NewKey(fmt.Sprintf("updateMetadata-%s", client.ID)),
)
if err != nil {
return nil, toStatusError(err)
}

if err := locker.Lock(ctx); err != nil {
log.Logger.Error(err)
return nil, toStatusError(err)
}
defer func() {
if err := locker.Unlock(ctx); err != nil {
log.Logger.Error(err)
}
}()

updatedDocKeys, err := s.backend.PubSub.UpdateSubscriber(*client, docKeys)
if err != nil {
log.Logger.Error(err)
Expand Down

0 comments on commit 6c8869d

Please sign in to comment.