From 2234e458055e516e8bb1144eb8949647d18b67be Mon Sep 17 00:00:00 2001 From: Ben Brooks Date: Fri, 10 Jan 2025 21:27:02 +0000 Subject: [PATCH] Resolve conflicts on pull and rewrite local winner as new version to be pushed back --- db/hybrid_logical_vector.go | 8 +- rest/utilities_testing_blip_client.go | 118 +++++++++++++++--- topologytest/couchbase_lite_mock_peer_test.go | 6 +- topologytest/couchbase_server_peer_test.go | 4 +- topologytest/hlv_test.go | 48 +++---- topologytest/multi_actor_conflict_test.go | 33 +++-- topologytest/sync_gateway_peer_test.go | 6 +- 7 files changed, 163 insertions(+), 60 deletions(-) diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 1d227001fa..f6bd9bafb1 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -340,13 +340,13 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto // for source if the local version for that source is lower for i, v := range otherVector.PreviousVersions { if hlv.PreviousVersions[i] == 0 { - hlv.setPreviousVersion(i, v) + hlv.SetPreviousVersion(i, v) } else { // if we get here then there is entry for this source in PV so we must check if its newer or not otherHLVPVValue := v localHLVPVValue := hlv.PreviousVersions[i] if localHLVPVValue < otherHLVPVValue { - hlv.setPreviousVersion(i, v) + hlv.SetPreviousVersion(i, v) } } } @@ -375,8 +375,8 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi return outputSpec } -// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map -func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) { +// SetPreviousVersion will take a source/version pair and add it to the HLV previous versions map +func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) { if hlv.PreviousVersions == nil { hlv.PreviousVersions = make(HLVVersions) } diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index 9abb2b2766..0157dd67db 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -37,6 +37,22 @@ const ( RevtreeSubtestName = "revTree" ) +type BlipTesterClientConflictResolverType string + +const ( + ConflictResolverLastWriteWins BlipTesterClientConflictResolverType = "lww" + + ConflictResolverDefault = ConflictResolverLastWriteWins +) + +func (c BlipTesterClientConflictResolverType) IsValid() bool { + switch c { + case ConflictResolverLastWriteWins: + return true + } + return false +} + type BlipTesterClientOpts struct { ClientDeltas bool // Support deltas on the client side Username string @@ -62,6 +78,8 @@ type BlipTesterClientOpts struct { // SourceID is used to define the SourceID for the blip client SourceID string + + ConflictResolver BlipTesterClientConflictResolverType } // defaultBlipTesterClientRevsLimit is the number of revisions sent as history when the client replicates - older revisions are not sent, and may not be stored. @@ -281,6 +299,12 @@ func (cd *clientDoc) currentVersion(t testing.TB) *db.Version { return &rev.version.CV } +func (cd *clientDoc) _currentVersion(t testing.TB) *db.Version { + rev, err := cd._latestRev() + require.NoError(t, err) + return &rev.version.CV +} + type BlipTesterCollectionClient struct { parent *BlipTesterClient @@ -563,20 +587,41 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { doc.lock.Lock() defer doc.lock.Unlock() + var incomingVersion DocVersion var newVersion DocVersion var hlv db.HybridLogicalVector if btc.UseHLV() { + var incomingHLV *db.HybridLogicalVector if revHistory != "" { - existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) + incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory) require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) - hlv = *existingVersion + hlv = *incomingHLV } - v, err := db.ParseVersion(revID) + incomingCV, err := db.ParseVersion(revID) require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) - newVersion = DocVersion{CV: v} - require.NoError(btr.TB(), hlv.AddVersion(v)) + incomingVersion = DocVersion{CV: incomingCV} + + clientCV := doc._currentVersion(btc.TB()) + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // generate a new version for the resolution and write it to the remote HLV + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + newVersion = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + newVersion = DocVersion{CV: incomingCV} + } + require.NoError(btc.TB(), hlv.AddVersion(newVersion.CV), "couldn't add newVersion CV into doc HLV") } else { newVersion = DocVersion{RevTreeID: revID} + incomingVersion = newVersion } docRev := clientDocRev{ @@ -606,12 +651,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // store the new sequence for a replaced rev for tests waiting for this specific rev doc._seqsByVersions[replacedVersion] = newClientSeq } - doc._latestServerVersion = newVersion + // store the _incoming_ version - not newVersion - since we may have written a resolved conflict which will need pushing back + doc._latestServerVersion = incomingVersion if !msg.NoReply() { response := msg.Response() response.SetBody([]byte(`[]`)) } + + // new sequence written, wake up changes feeds for push + btcr._seqCond.Broadcast() return } @@ -786,24 +835,53 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { doc.lock.Lock() defer doc.lock.Unlock() - var newVersion DocVersion + var incomingVersion DocVersion + var versionToWrite DocVersion var hlv db.HybridLogicalVector if btc.UseHLV() { + var incomingHLV *db.HybridLogicalVector if revHistory != "" { - existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) + incomingHLV, _, err = db.ExtractHLVFromBlipMessage(revHistory) require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) - hlv = *existingVersion + hlv = *incomingHLV } - v, err := db.ParseVersion(revID) + incomingCV, err := db.ParseVersion(revID) require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) - newVersion = DocVersion{CV: v} - require.NoError(btr.TB(), hlv.AddVersion(v)) + incomingVersion = DocVersion{CV: incomingCV} + + // fetch client's latest version to do conflict check and resolution + latestClientRev, err := doc._latestRev() + require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID) + if latestClientRev != nil { + clientCV := latestClientRev.version.CV + + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // local wins so write the local body back as a new resolved version (based on incoming HLV) to push + body = latestClientRev.body + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + versionToWrite = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + // no conflict - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} + } + } + require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV") } else { - newVersion = DocVersion{RevTreeID: revID} + versionToWrite = DocVersion{RevTreeID: revID} + incomingVersion = versionToWrite } docRev := clientDocRev{ clientSeq: newClientSeq, - version: newVersion, + version: versionToWrite, HLV: hlv, body: body, message: msg, @@ -827,12 +905,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // store the new sequence for a replaced rev for tests waiting for this specific rev doc._seqsByVersions[replacedVersion] = newClientSeq } - doc._latestServerVersion = newVersion + // store the _incoming_ version - not versionToWrite - since we may have written a resolved conflict which will need pushing back + doc._latestServerVersion = incomingVersion if !msg.NoReply() { response := msg.Response() response.SetBody([]byte(`[]`)) } + + // new sequence written, wake up changes feeds for push + btcr._seqCond.Broadcast() } btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = func(msg *blip.Message) { @@ -999,6 +1081,12 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes if !opts.AllowCreationWithoutBlipTesterClientRunner && !btcRunner.initialisedInsideRunnerCode { require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method") } + if opts.ConflictResolver == "" { + opts.ConflictResolver = ConflictResolverDefault + } + if !opts.ConflictResolver.IsValid() { + require.FailNow(btcRunner.TB(), "invalid conflict resolver %q", opts.ConflictResolver) + } if opts.SourceID == "" { opts.SourceID = "blipclient" } diff --git a/topologytest/couchbase_lite_mock_peer_test.go b/topologytest/couchbase_lite_mock_peer_test.go index 24c806dce2..1c934ae02c 100644 --- a/topologytest/couchbase_lite_mock_peer_test.go +++ b/topologytest/couchbase_lite_mock_peer_test.go @@ -65,7 +65,11 @@ func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID bodyBytes, meta := p.getLatestDocVersion(dsName, docID) require.NotNil(p.TB(), meta, "docID:%s not found on %s", docID, p) var body db.Body - require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body)) + // it's easier if all clients can return consistent bodies for tombstones + // lets just settle on nil, since we still need special handling anyway for `` vs `{}` so unmarshal doesn't barf + if len(bodyBytes) > 0 && string(bodyBytes) != base.EmptyDocument { + require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body)) + } return *meta, body } diff --git a/topologytest/couchbase_server_peer_test.go b/topologytest/couchbase_server_peer_test.go index 6150aadc2c..ef2e832a84 100644 --- a/topologytest/couchbase_server_peer_test.go +++ b/topologytest/couchbase_server_peer_test.go @@ -327,6 +327,8 @@ func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) ( require.NoError(peer.TB(), err) // get hlv to construct DocVersion var body db.Body - require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body)) + if len(docBytes) > 0 { + require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body)) + } return getDocVersion(docID, peer, cas, xattrs), body } diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 7591cdc0b1..95e49ffcdb 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -15,6 +15,7 @@ import ( "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -65,6 +66,25 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p } } +// waitForConvergingVersion waits for the same document version to reach all peers. +func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { + t.Logf("waiting for converged doc versions across all peers") + require.EventuallyWithT(t, func(c *assert.CollectT) { + for peerAid, peerA := range peers.SortedPeers() { + docMetaA, bodyA := peerA.GetDocument(dsName, docID) + for peerBid, peerB := range peers.SortedPeers() { + if peerAid == peerBid { + continue + } + docMetaB, bodyB := peerB.GetDocument(dsName, docID) + cvA, cvB := docMetaA.CV(t), docMetaB.CV(t) + require.Equalf(c, cvA, cvB, "CV mismatch: %s:%#v != %s:%#v", peerAid, docMetaA, peerBid, docMetaB) + require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB) + } + } + }, totalWaitTime, pollInterval) +} + // removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS // so we can skip creating docs on these peers (avoiding conflicts between docs created on the SGW and cbs) func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { @@ -80,9 +100,9 @@ func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { return peersToRemove } -// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then -// returns the last peer to have a doc created on it -func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) { +// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents. +// It is not known at this stage which write the "winner" will be, since conflict resolution can happen at replication time which may not be LWW, or may be LWW but with a new value. +func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) documentVersion := make([]BodyAndVersion, 0, len(peers)) for peerName, peer := range peers { @@ -94,15 +114,10 @@ func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta) documentVersion = append(documentVersion, docVersion) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } -// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then -// returns the last peer to have a doc updated on it. -func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) { +// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations +func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) var documentVersion []BodyAndVersion for peerName, peer := range peers { @@ -114,15 +129,10 @@ func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, pee t.Logf("updateVersion: %#v", docVersion.docMeta) documentVersion = append(documentVersion, docVersion) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } -// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then -// returns the last peer to have a doc deleted on it -func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) (lastWrite BodyAndVersion) { +// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions +func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { backingPeers := removeSyncGatewayBackingPeers(peers) var documentVersion []BodyAndVersion for peerName, peer := range peers { @@ -133,10 +143,6 @@ func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers t.Logf("deleteVersion: %#v", deleteVersion) documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName}) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } // getDocID returns a unique doc ID for the test case. Note: when running with Couchbase Server and -count > 1, this will return duplicate IDs for count 2 and higher and they can conflict due to the way bucket pool works. diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index 7765f7a23e..a1f1bab259 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -23,10 +23,9 @@ func TestMultiActorConflictCreate(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, docID, docVersion) - + waitForConvergingVersion(t, collectionName, peers, docID) }) } } @@ -46,16 +45,16 @@ func TestMultiActorConflictUpdate(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, docID) replications.Stop() - docVersion = updateConflictingDocs(t, collectionName, peers, docID, topology.description) + updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, docID) }) } } @@ -75,16 +74,16 @@ func TestMultiActorConflictDelete(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, docID) replications.Stop() - lastWrite := deleteConflictDocs(t, collectionName, peers, docID) + deleteConflictDocs(t, collectionName, peers, docID) replications.Start() - waitForTombstoneVersion(t, collectionName, peers, docID, lastWrite) + waitForConvergingVersion(t, collectionName, peers, docID) }) } } @@ -108,23 +107,23 @@ func TestMultiActorConflictResurrect(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, docID) replications.Stop() - lastWrite := deleteConflictDocs(t, collectionName, peers, docID) + deleteConflictDocs(t, collectionName, peers, docID) replications.Start() - waitForTombstoneVersion(t, collectionName, peers, docID, lastWrite) + waitForConvergingVersion(t, collectionName, peers, docID) replications.Stop() - lastWriteVersion := updateConflictingDocs(t, collectionName, peers, docID, topology.description) + updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, docID, lastWriteVersion) + waitForConvergingVersion(t, collectionName, peers, docID) }) } } diff --git a/topologytest/sync_gateway_peer_test.go b/topologytest/sync_gateway_peer_test.go index ae9e3e18ef..de846d1e09 100644 --- a/topologytest/sync_gateway_peer_test.go +++ b/topologytest/sync_gateway_peer_test.go @@ -63,7 +63,11 @@ func (p *SyncGatewayPeer) GetDocument(dsName sgbucket.DataStoreName, docID strin collection, ctx := p.getCollection(dsName) doc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalAll) require.NoError(p.TB(), err) - return DocMetadataFromDocument(doc), doc.Body(ctx) + var body db.Body + if !doc.IsDeleted() { + body = doc.Body(ctx) + } + return DocMetadataFromDocument(doc), body } // CreateDocument creates a document on the peer. The test will fail if the document already exists.