Skip to content

Commit

Permalink
Resolve conflicts on pull and rewrite local winner as new version to …
Browse files Browse the repository at this point in the history
…be pushed back
  • Loading branch information
bbrks committed Jan 10, 2025
1 parent 8a07d8d commit 2234e45
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 60 deletions.
8 changes: 4 additions & 4 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto
// for source if the local version for that source is lower
for i, v := range otherVector.PreviousVersions {
if hlv.PreviousVersions[i] == 0 {
hlv.setPreviousVersion(i, v)
hlv.SetPreviousVersion(i, v)
} else {
// if we get here then there is entry for this source in PV so we must check if its newer or not
otherHLVPVValue := v
localHLVPVValue := hlv.PreviousVersions[i]
if localHLVPVValue < otherHLVPVValue {
hlv.setPreviousVersion(i, v)
hlv.SetPreviousVersion(i, v)
}
}
}
Expand Down Expand Up @@ -375,8 +375,8 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi
return outputSpec
}

// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) {
// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map
func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) {
if hlv.PreviousVersions == nil {
hlv.PreviousVersions = make(HLVVersions)
}
Expand Down
118 changes: 103 additions & 15 deletions rest/utilities_testing_blip_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ const (
RevtreeSubtestName = "revTree"
)

type BlipTesterClientConflictResolverType string

const (
ConflictResolverLastWriteWins BlipTesterClientConflictResolverType = "lww"

ConflictResolverDefault = ConflictResolverLastWriteWins
)

func (c BlipTesterClientConflictResolverType) IsValid() bool {
switch c {
case ConflictResolverLastWriteWins:
return true
}
return false
}

type BlipTesterClientOpts struct {
ClientDeltas bool // Support deltas on the client side
Username string
Expand All @@ -62,6 +78,8 @@ type BlipTesterClientOpts struct {

// SourceID is used to define the SourceID for the blip client
SourceID string

ConflictResolver BlipTesterClientConflictResolverType
}

// defaultBlipTesterClientRevsLimit is the number of revisions sent as history when the client replicates - older revisions are not sent, and may not be stored.
Expand Down Expand Up @@ -281,6 +299,12 @@ func (cd *clientDoc) currentVersion(t testing.TB) *db.Version {
return &rev.version.CV
}

func (cd *clientDoc) _currentVersion(t testing.TB) *db.Version {
rev, err := cd._latestRev()
require.NoError(t, err)
return &rev.version.CV
}

type BlipTesterCollectionClient struct {
parent *BlipTesterClient

Expand Down Expand Up @@ -563,20 +587,41 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
doc.lock.Lock()
defer doc.lock.Unlock()

var incomingVersion DocVersion
var newVersion DocVersion
var hlv db.HybridLogicalVector
if btc.UseHLV() {
var incomingHLV *db.HybridLogicalVector
if revHistory != "" {
existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory)
incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory)
require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err)
hlv = *existingVersion
hlv = *incomingHLV
}
v, err := db.ParseVersion(revID)
incomingCV, err := db.ParseVersion(revID)
require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err)
newVersion = DocVersion{CV: v}
require.NoError(btr.TB(), hlv.AddVersion(v))
incomingVersion = DocVersion{CV: incomingCV}

clientCV := doc._currentVersion(btc.TB())
// incoming rev older than stored client version and comes from a different source - need to resolve
if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID {
btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV)
switch btc.BlipTesterClientOpts.ConflictResolver {
case ConflictResolverLastWriteWins:
// generate a new version for the resolution and write it to the remote HLV
v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())}
require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV")
newVersion = DocVersion{CV: v}
hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value)
default:
btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver)
}
} else {
newVersion = DocVersion{CV: incomingCV}
}
require.NoError(btc.TB(), hlv.AddVersion(newVersion.CV), "couldn't add newVersion CV into doc HLV")
} else {
newVersion = DocVersion{RevTreeID: revID}
incomingVersion = newVersion
}

docRev := clientDocRev{
Expand Down Expand Up @@ -606,12 +651,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
// store the new sequence for a replaced rev for tests waiting for this specific rev
doc._seqsByVersions[replacedVersion] = newClientSeq
}
doc._latestServerVersion = newVersion
// store the _incoming_ version - not newVersion - since we may have written a resolved conflict which will need pushing back
doc._latestServerVersion = incomingVersion

if !msg.NoReply() {
response := msg.Response()
response.SetBody([]byte(`[]`))
}

// new sequence written, wake up changes feeds for push
btcr._seqCond.Broadcast()
return
}

Expand Down Expand Up @@ -786,24 +835,53 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
doc.lock.Lock()
defer doc.lock.Unlock()

var newVersion DocVersion
var incomingVersion DocVersion
var versionToWrite DocVersion
var hlv db.HybridLogicalVector
if btc.UseHLV() {
var incomingHLV *db.HybridLogicalVector
if revHistory != "" {
existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory)
incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory)
require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err)
hlv = *existingVersion
hlv = *incomingHLV
}
v, err := db.ParseVersion(revID)
incomingCV, err := db.ParseVersion(revID)
require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err)
newVersion = DocVersion{CV: v}
require.NoError(btr.TB(), hlv.AddVersion(v))
incomingVersion = DocVersion{CV: incomingCV}

// fetch client's latest version to do conflict check and resolution
latestClientRev, err := doc._latestRev()
require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID)
if latestClientRev != nil {
clientCV := latestClientRev.version.CV

// incoming rev older than stored client version and comes from a different source - need to resolve
if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID {
btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV)
switch btc.BlipTesterClientOpts.ConflictResolver {
case ConflictResolverLastWriteWins:
// local wins so write the local body back as a new resolved version (based on incoming HLV) to push
body = latestClientRev.body
v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())}
require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV")
versionToWrite = DocVersion{CV: v}
hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value)
default:
btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver)
}
} else {
// no conflict - accept incoming rev
versionToWrite = DocVersion{CV: incomingCV}
}
}
require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV")
} else {
newVersion = DocVersion{RevTreeID: revID}
versionToWrite = DocVersion{RevTreeID: revID}
incomingVersion = versionToWrite
}
docRev := clientDocRev{
clientSeq: newClientSeq,
version: newVersion,
version: versionToWrite,
HLV: hlv,
body: body,
message: msg,
Expand All @@ -827,12 +905,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) {
// store the new sequence for a replaced rev for tests waiting for this specific rev
doc._seqsByVersions[replacedVersion] = newClientSeq
}
doc._latestServerVersion = newVersion
// store the _incoming_ version - not versionToWrite - since we may have written a resolved conflict which will need pushing back
doc._latestServerVersion = incomingVersion

if !msg.NoReply() {
response := msg.Response()
response.SetBody([]byte(`[]`))
}

// new sequence written, wake up changes feeds for push
btcr._seqCond.Broadcast()
}

btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = func(msg *blip.Message) {
Expand Down Expand Up @@ -999,6 +1081,12 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes
if !opts.AllowCreationWithoutBlipTesterClientRunner && !btcRunner.initialisedInsideRunnerCode {
require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method")
}
if opts.ConflictResolver == "" {
opts.ConflictResolver = ConflictResolverDefault
}
if !opts.ConflictResolver.IsValid() {
require.FailNow(btcRunner.TB(), "invalid conflict resolver %q", opts.ConflictResolver)
}
if opts.SourceID == "" {
opts.SourceID = "blipclient"
}
Expand Down
6 changes: 5 additions & 1 deletion topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID
bodyBytes, meta := p.getLatestDocVersion(dsName, docID)
require.NotNil(p.TB(), meta, "docID:%s not found on %s", docID, p)
var body db.Body
require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body))
// it's easier if all clients can return consistent bodies for tombstones
// lets just settle on nil, since we still need special handling anyway for `` vs `{}` so unmarshal doesn't barf
if len(bodyBytes) > 0 && string(bodyBytes) != base.EmptyDocument {
require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body))
}
return *meta, body
}

Expand Down
4 changes: 3 additions & 1 deletion topologytest/couchbase_server_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) (
require.NoError(peer.TB(), err)
// get hlv to construct DocVersion
var body db.Body
require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body))
if len(docBytes) > 0 {
require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body))
}
return getDocVersion(docID, peer, cas, xattrs), body
}
48 changes: 27 additions & 21 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -65,6 +66,25 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p
}
}

// waitForConvergingVersion waits for the same document version to reach all peers.
func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) {
t.Logf("waiting for converged doc versions across all peers")
require.EventuallyWithT(t, func(c *assert.CollectT) {
for peerAid, peerA := range peers.SortedPeers() {
docMetaA, bodyA := peerA.GetDocument(dsName, docID)
for peerBid, peerB := range peers.SortedPeers() {
if peerAid == peerBid {
continue
}
docMetaB, bodyB := peerB.GetDocument(dsName, docID)
cvA, cvB := docMetaA.CV(t), docMetaB.CV(t)
require.Equalf(c, cvA, cvB, "CV mismatch: %s:%#v != %s:%#v", peerAid, docMetaA, peerBid, docMetaB)
require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB)
}
}
}, totalWaitTime, pollInterval)
}

// removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS
// so we can skip creating docs on these peers (avoiding conflicts between docs created on the SGW and cbs)
func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool {
Expand All @@ -80,9 +100,9 @@ func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool {
return peersToRemove
}

// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then
// returns the last peer to have a doc created on it
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) {
// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents.
// It is not known at this stage which write the "winner" will be, since conflict resolution can happen at replication time which may not be LWW, or may be LWW but with a new value.
func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) {
backingPeers := removeSyncGatewayBackingPeers(peers)
documentVersion := make([]BodyAndVersion, 0, len(peers))
for peerName, peer := range peers {
Expand All @@ -94,15 +114,10 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee
t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta)
documentVersion = append(documentVersion, docVersion)
}
index := len(documentVersion) - 1
lastWrite = documentVersion[index]

return lastWrite
}

// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then
// returns the last peer to have a doc updated on it.
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) {
// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations
func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) {
backingPeers := removeSyncGatewayBackingPeers(peers)
var documentVersion []BodyAndVersion
for peerName, peer := range peers {
Expand All @@ -114,15 +129,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee
t.Logf("updateVersion: %#v", docVersion.docMeta)
documentVersion = append(documentVersion, docVersion)
}
index := len(documentVersion) - 1
lastWrite = documentVersion[index]

return lastWrite
}

// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then
// returns the last peer to have a doc deleted on it
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) (lastWrite BodyAndVersion) {
// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions
func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) {
backingPeers := removeSyncGatewayBackingPeers(peers)
var documentVersion []BodyAndVersion
for peerName, peer := range peers {
Expand All @@ -133,10 +143,6 @@ func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers
t.Logf("deleteVersion: %#v", deleteVersion)
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
}
index := len(documentVersion) - 1
lastWrite = documentVersion[index]

return lastWrite
}

// getDocID returns a unique doc ID for the test case. Note: when running with Couchbase Server and -count > 1, this will return duplicate IDs for count 2 and higher and they can conflict due to the way bucket pool works.
Expand Down
Loading

0 comments on commit 2234e45

Please sign in to comment.