Skip to content

Commit

Permalink
Fix duplicate changes when attaching and detaching
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins committed Jun 13, 2024
1 parent 23b3662 commit 14c9b47
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 26 deletions.
34 changes: 27 additions & 7 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ func SnapshotKey(projectID types.ID, docKey key.Key) sync.Key {
return sync.NewKey(fmt.Sprintf("snapshot-%s-%s", projectID, docKey))
}

// PushPullOptions represents the options for PushPull.
type PushPullOptions struct {
// Mode represents the sync mode.
Mode types.SyncMode

// Status represents the status of the document to be updated.
Status document.StatusType
}

// PushPull stores the given changes and returns accumulated changes of the
// given document.
//
Expand All @@ -59,7 +68,7 @@ func PushPull(
clientInfo *database.ClientInfo,
docInfo *database.DocInfo,
reqPack *change.Pack,
mode types.SyncMode,
opts PushPullOptions,
) (*ServerPack, error) {
start := gotime.Now()
defer func() {
Expand All @@ -76,20 +85,31 @@ func PushPull(
be.Metrics.AddPushPullReceivedOperations(reqPack.OperationsLen())

// 02. pull pack: pull changes or a snapshot from the database and create a response pack.
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, mode)
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, opts.Mode)
if err != nil {
return nil, err
}
be.Metrics.AddPushPullSentChanges(respPack.ChangesLen())
be.Metrics.AddPushPullSentOperations(respPack.OperationsLen())
be.Metrics.AddPushPullSnapshotBytes(respPack.SnapshotLen())

// 03. update the client's document and checkpoint.
docRefKey := docInfo.RefKey()
if err := clientInfo.UpdateCheckpoint(docRefKey.DocID, respPack.Checkpoint); err != nil {
return nil, err
if opts.Status == document.StatusRemoved {
if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}
} else if opts.Status == document.StatusDetached {
if err := clientInfo.DetachDocument(docInfo.ID); err != nil {
return nil, err
}
} else {
if err := clientInfo.UpdateCheckpoint(docRefKey.DocID, respPack.Checkpoint); err != nil {
return nil, err
}
}

// 03. store pushed changes, docInfo and checkpoint of the client to DB.
// 04. store pushed changes, docInfo and checkpoint of the client to DB.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
if err := be.DB.CreateChangeInfos(
ctx,
Expand All @@ -107,7 +127,7 @@ func PushPull(
return nil, err
}

// 04. update and find min synced ticket for garbage collection.
// 05. update and find min synced ticket for garbage collection.
// NOTE(hackerwins): Since the client could not receive the response, the
// requested seq(reqPack) is stored instead of the response seq(resPack).
minSyncedTicket, err := be.DB.UpdateAndFindMinSyncedTicket(
Expand Down Expand Up @@ -135,7 +155,7 @@ func PushPull(
minSyncedTicket.ToTestString(),
)

// 05. publish document change event then store snapshot asynchronously.
// 06. publish document change event then store snapshot asynchronously.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
be.Background.AttachGoroutine(func(ctx context.Context) {
publisherID, err := clientInfo.ID.ToActorID()
Expand Down
44 changes: 25 additions & 19 deletions server/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
Expand Down Expand Up @@ -160,7 +161,10 @@ func (s *yorkieServer) AttachDocument(
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,18 +247,18 @@ func (s *yorkieServer) DetachDocument(
return nil, err
}

var status document.StatusType
if req.Msg.RemoveIfNotAttached && !isAttached {
pack.IsRemoved = true
if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}
status = document.StatusRemoved
} else {
if err := clientInfo.DetachDocument(docInfo.ID); err != nil {
return nil, err
}
status = document.StatusDetached
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: status,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,11 +320,6 @@ func (s *yorkieServer) PushPullChanges(
}()
}

syncMode := types.SyncModePushPull
if req.Msg.PushOnly {
syncMode = types.SyncModePushOnly
}

clientInfo, err := clients.FindActiveClientInfo(ctx, s.backend.DB, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(actorID),
Expand All @@ -342,7 +341,15 @@ func (s *yorkieServer) PushPullChanges(
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, syncMode)
syncMode := types.SyncModePushPull
if req.Msg.PushOnly {
syncMode = types.SyncModePushOnly
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: syncMode,
Status: document.StatusAttached,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -531,11 +538,10 @@ func (s *yorkieServer) RemoveDocument(
return nil, err
}

if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusRemoved,
})
if err != nil {
return nil, err
}
Expand Down
38 changes: 38 additions & 0 deletions test/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,42 @@ func TestClient(t *testing.T) {

wg.Wait()
})

t.Run("concurrent sync and detach test", func(t *testing.T) {
clients := activeClients(t, 2)
defer deactivateAndCloseClients(t, clients)
c1, c2 := clients[0], clients[1]

ctx := context.Background()
d1 := document.New(helper.TestDocKey(t))
assert.NoError(t, c1.Attach(ctx, d1))
d2 := document.New(helper.TestDocKey(t))
assert.NoError(t, c2.Attach(ctx, d2))

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
root.SetNewArray("array").AddInteger(1, 2)
return nil
}))
assert.NoError(t, c1.Sync(ctx))

assert.NoError(t, d1.Update(func(root *json.Object, p *presence.Presence) error {
root.GetArray("array").AddInteger(3)
return nil
}))

wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
assert.NoError(t, c1.Sync(ctx))
}()
go func() {
defer wg.Done()
assert.NoError(t, c1.Detach(ctx, d1))
}()
wg.Wait()

assert.NoError(t, c2.Sync(ctx))
assert.Equal(t, d1.Marshal(), d2.Marshal())
})
}

0 comments on commit 14c9b47

Please sign in to comment.