Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove duplicated version vector from pack #1024

Merged
merged 5 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,19 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

minSyncedVersionVector, err := FromVersionVector(pbPack.MinSyncedVersionVector)
if err != nil {
return nil, err
}

minSyncedTicket, err := fromTimeTicket(pbPack.MinSyncedTicket)
if err != nil {
return nil, err
}

pack := &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
MinSyncedVersionVector: minSyncedVersionVector,
MinSyncedTicket: minSyncedTicket,
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
IsRemoved: pbPack.IsRemoved,
VersionVector: versionVector,
MinSyncedTicket: minSyncedTicket,
}

return pack, nil
Expand Down
6 changes: 0 additions & 6 deletions api/docs/yorkie/v1/resources.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,6 @@ components:
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
6 changes: 0 additions & 6 deletions api/docs/yorkie/v1/yorkie.openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -470,12 +470,6 @@ components:
description: Deprecated
title: min_synced_ticket
type: object
minSyncedVersionVector:
$ref: '#/components/schemas/yorkie.v1.VersionVector'
additionalProperties: false
description: ""
title: min_synced_version_vector
type: object
snapshot:
additionalProperties: false
description: ""
Expand Down
1,646 changes: 816 additions & 830 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ message ChangePack {
repeated Change changes = 4;
bool is_removed = 6;
VersionVector version_vector = 7;
VersionVector min_synced_version_vector = 8;

TimeTicket min_synced_ticket = 5; // Deprecated
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ type Pack struct {
// 2. In response(Snapshot), it is the version vector of the snapshot of the document.
VersionVector time.VersionVector

// TODO(hackerwins): Consider to merge MinSyncedVersionVector with VersionVector.
// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

Expand Down
12 changes: 7 additions & 5 deletions pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ func (d *Document) Update(
// ApplyChangePack applies the given change pack into this document.
func (d *Document) ApplyChangePack(pack *change.Pack) error {
// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
hasSnapshot := len(pack.Snapshot) > 0

if hasSnapshot {
d.cloneRoot = nil
d.clonePresences = nil
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
Expand Down Expand Up @@ -215,13 +217,13 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
d.doc.checkpoint = d.doc.checkpoint.Forward(pack.Checkpoint)

// 04. Do Garbage collection.
if !d.options.DisableGC {
d.GarbageCollect(pack.MinSyncedVersionVector)
if !d.options.DisableGC && !hasSnapshot {
d.GarbageCollect(pack.VersionVector)
}

// 05. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if pack.VersionVector != nil && !hasSnapshot {
actorIDs, err := pack.VersionVector.Keys()
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/document/document_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ func TestDocument(t *testing.T) {
docB := document.New("doc")
docB.SetActor(actorB)
assert.Equal(t, "{}", docB.VersionVector().Marshal())
// NOTE(JOOHOJANG): Normally, docB's Lamport timestamp should be included in pack.versionVector because pack is applied after docB is attached.
// However, since this is not the case in this test method, docB's Lamport timestamp is manually added to packA's versionVector.
// In actual use, since changePacks cannot be exchanged directly between clients without going through a server, the following handling was added.
packA.VersionVector.Set(docB.ActorID(), docB.VersionVector().VersionOf(docB.ActorID()))
assert.NoError(t, docB.ApplyChangePack(packA))
assert.Equal(t, "{000000000000000000000001:2,000000000000000000000002:3}", docB.VersionVector().Marshal())

Expand Down
12 changes: 7 additions & 5 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func (d *InternalDocument) HasLocalChanges() bool {

// ApplyChangePack applies the given change pack into this document.
func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) error {
hasSnapshot := len(pack.Snapshot) > 0

// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
if hasSnapshot {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.VersionVector); err != nil {
return err
}
Expand All @@ -167,15 +169,15 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack, disableGC bool) er
// 03. Update the checkpoint.
d.checkpoint = d.checkpoint.Forward(pack.Checkpoint)

if !disableGC && pack.MinSyncedTicket != nil {
if _, err := d.GarbageCollect(pack.MinSyncedVersionVector); err != nil {
if !disableGC && pack.VersionVector != nil && !hasSnapshot {
if _, err := d.GarbageCollect(pack.VersionVector); err != nil {
return err
}
}

// 04. Remove detached client's lamport from version vector if it exists
if pack.MinSyncedVersionVector != nil {
actorIDs, err := pack.MinSyncedVersionVector.Keys()
if pack.VersionVector != nil && !hasSnapshot {
actorIDs, err := pack.VersionVector.Keys()
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ func PushPull(
if err != nil {
return nil, err
}
respPack.MinSyncedVersionVector = minSyncedVersionVector
if respPack.SnapshotLen() == 0 {
respPack.VersionVector = minSyncedVersionVector
}

// TODO(hackerwins): This is a previous implementation before the version
// vector was introduced. But it is necessary to support the previous
Expand Down Expand Up @@ -214,7 +216,7 @@ func PushPull(
ctx,
be,
docInfo,
minSyncedTicket,
minSyncedVersionVector,
); err != nil {
logging.From(ctx).Error(err)
}
Expand Down
14 changes: 0 additions & 14 deletions server/packs/serverpacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ type ServerPack struct {
// 2. In response(Snapshot), it is the version vector of the snapshot of the document.
VersionVector time.VersionVector

// TODO(hackerwins): Consider to merge MinSyncedVersionVector with VersionVector.
// MinSyncedVersionVector is the minimum version vector taken by clients who
// attach the document.
MinSyncedVersionVector time.VersionVector

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

Expand Down Expand Up @@ -150,15 +145,6 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) {

pbPack.VersionVector = pbVersionVector

if p.MinSyncedVersionVector != nil {
pbMinSyncedVersionVector, err := converter.ToVersionVector(p.MinSyncedVersionVector)
if err != nil {
return nil, err
}

pbPack.MinSyncedVersionVector = pbMinSyncedVersionVector
}

return pbPack, nil
}

Expand Down
14 changes: 10 additions & 4 deletions server/packs/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func storeSnapshot(
ctx context.Context,
be *backend.Backend,
docInfo *database.DocInfo,
minSyncedTicket *time.Ticket,
minSyncedVersionVector time.VersionVector,
) error {
// 01. get the closest snapshot's metadata of this docInfo
docRefKey := docInfo.RefKey()
Expand Down Expand Up @@ -91,13 +91,19 @@ func storeSnapshot(
nil,
nil,
)
pack.MinSyncedTicket = minSyncedTicket

if err := doc.ApplyChangePack(pack, be.Config.SnapshotDisableGC); err != nil {
return err
}

// 04. save the snapshot of the docInfo
// 04. perform garbage collect to remove tombstones
if !be.Config.SnapshotDisableGC {
if _, err := doc.GarbageCollect(minSyncedVersionVector); err != nil {
return err
}
}

// 05. save the snapshot of the docInfo
if err := be.DB.CreateSnapshotInfo(
ctx,
docRefKey,
Expand All @@ -106,7 +112,7 @@ func storeSnapshot(
return err
}

// 05. delete changes before the smallest in `syncedseqs` to save storage.
// 06. delete changes before the smallest in `syncedseqs` to save storage.
if be.Config.SnapshotWithPurgingChanges {
if err := be.DB.PurgeStaleChanges(
ctx,
Expand Down