Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicate changes when syncing and detaching #896

Merged
merged 2 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ func SnapshotKey(projectID types.ID, docKey key.Key) sync.Key {
return sync.NewKey(fmt.Sprintf("snapshot-%s-%s", projectID, docKey))
}

// PushPullOptions represents the options for PushPull.
type PushPullOptions struct {
// Mode represents the sync mode.
Mode types.SyncMode

// Status represents the status of the document to be updated.
Status document.StatusType
}

// PushPull stores the given changes and returns accumulated changes of the
// given document.
//
Expand All @@ -59,7 +68,7 @@ func PushPull(
clientInfo *database.ClientInfo,
docInfo *database.DocInfo,
reqPack *change.Pack,
mode types.SyncMode,
opts PushPullOptions,
) (*ServerPack, error) {
start := gotime.Now()
defer func() {
Expand All @@ -76,20 +85,31 @@ func PushPull(
be.Metrics.AddPushPullReceivedOperations(reqPack.OperationsLen())

// 02. pull pack: pull changes or a snapshot from the database and create a response pack.
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, mode)
respPack, err := pullPack(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq, opts.Mode)
if err != nil {
return nil, err
}
be.Metrics.AddPushPullSentChanges(respPack.ChangesLen())
be.Metrics.AddPushPullSentOperations(respPack.OperationsLen())
be.Metrics.AddPushPullSnapshotBytes(respPack.SnapshotLen())

// 03. update the client's document and checkpoint.
docRefKey := docInfo.RefKey()
if err := clientInfo.UpdateCheckpoint(docRefKey.DocID, respPack.Checkpoint); err != nil {
return nil, err
if opts.Status == document.StatusRemoved {
if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}
} else if opts.Status == document.StatusDetached {
if err := clientInfo.DetachDocument(docInfo.ID); err != nil {
return nil, err
}
} else {
if err := clientInfo.UpdateCheckpoint(docRefKey.DocID, respPack.Checkpoint); err != nil {
return nil, err
}
}

// 03. store pushed changes, docInfo and checkpoint of the client to DB.
// 04. store pushed changes, docInfo and checkpoint of the client to DB.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
if err := be.DB.CreateChangeInfos(
ctx,
Expand All @@ -107,7 +127,7 @@ func PushPull(
return nil, err
}

// 04. update and find min synced ticket for garbage collection.
// 05. update and find min synced ticket for garbage collection.
// NOTE(hackerwins): Since the client could not receive the response, the
// requested seq(reqPack) is stored instead of the response seq(resPack).
minSyncedTicket, err := be.DB.UpdateAndFindMinSyncedTicket(
Expand Down Expand Up @@ -135,7 +155,7 @@ func PushPull(
minSyncedTicket.ToTestString(),
)

// 05. publish document change event then store snapshot asynchronously.
// 06. publish document change event then store snapshot asynchronously.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
be.Background.AttachGoroutine(func(ctx context.Context) {
publisherID, err := clientInfo.ID.ToActorID()
Expand Down
44 changes: 25 additions & 19 deletions server/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
Expand Down Expand Up @@ -160,7 +161,10 @@ func (s *yorkieServer) AttachDocument(
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,18 +247,18 @@ func (s *yorkieServer) DetachDocument(
return nil, err
}

var status document.StatusType
if req.Msg.RemoveIfNotAttached && !isAttached {
pack.IsRemoved = true
if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}
status = document.StatusRemoved
} else {
if err := clientInfo.DetachDocument(docInfo.ID); err != nil {
return nil, err
}
status = document.StatusDetached
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: status,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,11 +320,6 @@ func (s *yorkieServer) PushPullChanges(
}()
}

syncMode := types.SyncModePushPull
if req.Msg.PushOnly {
syncMode = types.SyncModePushOnly
}

clientInfo, err := clients.FindActiveClientInfo(ctx, s.backend.DB, types.ClientRefKey{
ProjectID: project.ID,
ClientID: types.IDFromActorID(actorID),
Expand All @@ -342,7 +341,15 @@ func (s *yorkieServer) PushPullChanges(
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, syncMode)
syncMode := types.SyncModePushPull
if req.Msg.PushOnly {
syncMode = types.SyncModePushOnly
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: syncMode,
Status: document.StatusAttached,
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -531,11 +538,10 @@ func (s *yorkieServer) RemoveDocument(
return nil, err
}

if err := clientInfo.RemoveDocument(docInfo.ID); err != nil {
return nil, err
}

pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, s.backend, project, clientInfo, docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusRemoved,
})
if err != nil {
return nil, err
}
Expand Down
30 changes: 24 additions & 6 deletions test/bench/push_pull_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ func benchmarkPushChanges(
assert.NoError(b, err)
b.StartTimer()

_, err = packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)
}
}
Expand All @@ -171,14 +174,20 @@ func benchmarkPullChanges(
}
docInfo, err := documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)

docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
b.StartTimer()

_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)
}
}
Expand Down Expand Up @@ -208,7 +217,10 @@ func benchmarkPushSnapshots(
assert.NoError(b, err)
b.StartTimer()

pulled, err := packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pushPack, types.SyncModePushPull)
pulled, err := packs.PushPull(ctx, be, project, clientInfos[0], docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)

b.StopTimer()
Expand Down Expand Up @@ -244,14 +256,20 @@ func benchmarkPullSnapshot(
}
docInfo, err := documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pusherClientInfo, docInfo, pushPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)

docInfo, err = documents.FindDocInfoByRefKey(ctx, be, docRefKey)
assert.NoError(b, err)
b.StartTimer()

_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, types.SyncModePushPull)
_, err = packs.PushPull(ctx, be, project, pullerClientInfo, docInfo, pullPack, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(b, err)
}
}
Expand Down
Loading