diff --git a/db/hybrid_logical_vector.go b/db/hybrid_logical_vector.go index 9fce119b10..4ef559db7c 100644 --- a/db/hybrid_logical_vector.go +++ b/db/hybrid_logical_vector.go @@ -273,7 +273,7 @@ func (hlv *HybridLogicalVector) InvalidateMV() { if source == hlv.SourceID { continue } - hlv.setPreviousVersion(source, value) + hlv.SetPreviousVersion(source, value) } hlv.MergeVersions = nil } @@ -346,13 +346,13 @@ func (hlv *HybridLogicalVector) AddNewerVersions(otherVector *HybridLogicalVecto // for source if the local version for that source is lower for i, v := range otherVector.PreviousVersions { if hlv.PreviousVersions[i] == 0 { - hlv.setPreviousVersion(i, v) + hlv.SetPreviousVersion(i, v) } else { // if we get here then there is entry for this source in PV so we must check if its newer or not otherHLVPVValue := v localHLVPVValue := hlv.PreviousVersions[i] if localHLVPVValue < otherHLVPVValue { - hlv.setPreviousVersion(i, v) + hlv.SetPreviousVersion(i, v) } } } @@ -384,8 +384,8 @@ func (hlv *HybridLogicalVector) computeMacroExpansions() []sgbucket.MacroExpansi return outputSpec } -// setPreviousVersion will take a source/version pair and add it to the HLV previous versions map -func (hlv *HybridLogicalVector) setPreviousVersion(source string, version uint64) { +// SetPreviousVersion will take a source/version pair and sets the value for the given source in the previous versions map +func (hlv *HybridLogicalVector) SetPreviousVersion(source string, version uint64) { if hlv.PreviousVersions == nil { hlv.PreviousVersions = make(HLVVersions) } @@ -443,6 +443,41 @@ func (hlv *HybridLogicalVector) ToHistoryForHLV() string { return s.String() } +func FromHistoryForHLV(history string) (*HybridLogicalVector, error) { + hlv := NewHybridLogicalVector() + // split the history string into PV and MV + versionSets := strings.Split(history, ";") + switch len(versionSets) { + case 0: + // no versions present + return hlv, nil + case 2: + // MV + mvs := strings.Split(versionSets[1], ",") + for _, mv := range mvs { + v, err := ParseVersion(mv) + if err != nil { + return nil, err + } + hlv.MergeVersions[v.SourceID] = v.Value + } + fallthrough + case 1: + // PV + pvs := strings.Split(versionSets[0], ",") + for _, pv := range pvs { + v, err := ParseVersion(pv) + if err != nil { + return nil, err + } + hlv.PreviousVersions[v.SourceID] = v.Value + } + default: + return nil, fmt.Errorf("Invalid history string format") + } + return hlv, nil +} + // appendRevocationMacroExpansions adds macro expansions for the channel map. Not strictly an HLV operation // but putting the function here as it's required when the HLV's current version is being macro expanded func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, channelNames []string) (updatedSpec []sgbucket.MacroExpansionSpec) { @@ -456,9 +491,9 @@ func appendRevocationMacroExpansions(currentSpec []sgbucket.MacroExpansionSpec, // ExtractHLVFromBlipMessage extracts the full HLV a string in the format seen over Blip // blip string may be the following formats -// 1. cv only: cv -// 2. cv and pv: cv;pv -// 3. cv, pv, and mv: cv;mv;pv +// 1. cv only: cv +// 2. cv and pv: cv;pv1,pv2 +// 3. cv+mv and pv: cv,mv1,mv2;pv1,pv2 // // Function will return list of revIDs if legacy rev ID was found in the HLV history section (PV) // TODO: CBG-3662 - Optimise once we've settled on and tested the format with CBL diff --git a/rest/blip_api_delta_sync_test.go b/rest/blip_api_delta_sync_test.go index 7c5ca79f04..b69e2dcd50 100644 --- a/rest/blip_api_delta_sync_test.go +++ b/rest/blip_api_delta_sync_test.go @@ -960,18 +960,22 @@ func TestBlipNonDeltaSyncPush(t *testing.T) { defer client.Close() client.ClientDeltas = false - btcRunner.StartPull(client.id) - btcRunner.StartPush(client.id) // create doc1 rev 1-0335a345b6ffed05707ccc4cbc1b67f4 version := rt.PutDocDirectly(docID, JsonToMap(t, `{"greetings": [{"hello": "world!"}, {"hi": "alice"}]}`)) + btcRunner.StartOneshotPull(client.id) data := btcRunner.WaitForVersion(client.id, docID, version) assert.Equal(t, `{"greetings":[{"hello":"world!"},{"hi":"alice"}]}`, string(data)) + // create doc1 rev 2-abcxyz on client newRev := btcRunner.AddRev(client.id, docID, &version, []byte(`{"greetings":[{"hello":"world!"},{"hi":"alice"},{"howdy":"bob"}]}`)) - // Check EE is delta, and CE is full-body replication + + btcRunner.StartPushWithOpts(client.id, BlipTesterPushOptions{Continuous: false, Since: "0"}) + msg := client.waitForReplicationMessage(collection, 2) + // ensure message is type rev + require.Equal(t, db.MessageRev, msg.Profile()) // Check the request was NOT sent with a deltaSrc property assert.Equal(t, "", msg.Properties[db.RevMessageDeltaSrc]) diff --git a/rest/utilities_testing_blip_client.go b/rest/utilities_testing_blip_client.go index a14c2e1b2c..1311711e6a 100644 --- a/rest/utilities_testing_blip_client.go +++ b/rest/utilities_testing_blip_client.go @@ -37,6 +37,22 @@ const ( RevtreeSubtestName = "revTree" ) +type BlipTesterClientConflictResolverType string + +const ( + ConflictResolverLastWriteWins BlipTesterClientConflictResolverType = "lww" + + ConflictResolverDefault = ConflictResolverLastWriteWins +) + +func (c BlipTesterClientConflictResolverType) IsValid() bool { + switch c { + case ConflictResolverLastWriteWins: + return true + } + return false +} + type BlipTesterClientOpts struct { ClientDeltas bool // Support deltas on the client side Username string @@ -62,6 +78,8 @@ type BlipTesterClientOpts struct { // SourceID is used to define the SourceID for the blip client SourceID string + + ConflictResolver BlipTesterClientConflictResolverType } // defaultBlipTesterClientRevsLimit is the number of revisions sent as history when the client replicates - older revisions are not sent, and may not be stored. @@ -118,7 +136,7 @@ func (c *BlipTesterCollectionClient) OneShotDocsSince(ctx context.Context, since continue } else if latestDocSeq := doc.latestSeq(); latestDocSeq != seq { // this entry should've been cleaned up from _seqStore - require.FailNow(c.TB(), "seq %d found in _seqStore but latestSeq for doc %d - this should've been pruned out!", seq, latestDocSeq) + require.FailNowf(c.TB(), "found old seq in _seqStore", "seq %d found in _seqStore but latestSeq for doc %d - this should've been pruned out!", seq, latestDocSeq) continue } if !yield(seq, doc) { @@ -366,7 +384,7 @@ func (btcc *BlipTesterCollectionClient) _getClientDoc(docID string) (*clientDoc, } clientDoc, ok := btcc._seqStore[seq] if !ok { - require.FailNow(btcc.TB(), "docID %q found in _seqFromDocID but seq %d not in _seqStore %v", docID, seq, btcc._seqStore) + require.FailNowf(btcc.TB(), "seq not found in _seqStore", "docID %q found in _seqFromDocID but seq %d not in _seqStore %v", docID, seq, btcc._seqStore) return nil, false } return clientDoc, ok @@ -571,28 +589,68 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { doc.lock.Lock() defer doc.lock.Unlock() - var newVersion DocVersion + var incomingVersion DocVersion + var versionToWrite DocVersion var hlv db.HybridLogicalVector + isDelete := true if btc.UseHLV() { + var incomingHLV *db.HybridLogicalVector if revHistory != "" { - existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) - require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) - hlv = *existingVersion + // TODO: Replace with new Beta version/handling + incomingHLV, err = db.FromHistoryForHLV(revHistory) + require.NoError(btr.TB(), err, "error extracting HLV history %q: %v", revHistory, err) + hlv = *incomingHLV } - v, err := db.ParseVersion(revID) + incomingCV, err := db.ParseVersion(revID) require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) - newVersion = DocVersion{CV: v} - require.NoError(btr.TB(), hlv.AddVersion(v)) + incomingVersion = DocVersion{CV: incomingCV} + + // fetch client's latest version to do conflict check and resolution + latestClientRev, err := doc._latestRev() + require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID) + if latestClientRev != nil { + clientCV := latestClientRev.version.CV + + // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent + if clientCV.SourceID == incomingCV.SourceID && clientCV.Value == incomingCV.Value { + require.FailNowf(btc.TB(), "incoming revision is equal to client revision", "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) + } + + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // local wins so write the local back as a new resolved version (based on incoming HLV) to push + body = latestClientRev.body + isDelete = latestClientRev.isDelete + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + versionToWrite = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + // no conflict - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} + } + } else { + // no existing rev - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} + } + require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV") } else { - newVersion = DocVersion{RevTreeID: revID} + versionToWrite = DocVersion{RevTreeID: revID} + incomingVersion = versionToWrite } - docRev := clientDocRev{ + clientSeq: newClientSeq, - version: newVersion, + version: versionToWrite, body: body, HLV: hlv, - isDelete: true, + isDelete: isDelete, message: msg, } @@ -614,12 +672,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // store the new sequence for a replaced rev for tests waiting for this specific rev doc._seqsByVersions[replacedVersion] = newClientSeq } - doc._latestServerVersion = newVersion + // store the _incoming_ version - not newVersion - since we may have written a resolved conflict which will need pushing back + doc._latestServerVersion = incomingVersion if !msg.NoReply() { response := msg.Response() response.SetBody([]byte(`[]`)) } + + // new sequence written, wake up changes feeds for push + btcr._seqCond.Broadcast() return } @@ -635,7 +697,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { response.SetError("HTTP", http.StatusUnprocessableEntity, "test code intentionally rejected delta") return } - require.FailNow(btr.TB(), "expected delta rev message to be sent without noreply flag: %+v", msg) + require.FailNowf(btr.TB(), "expected delta rev message to be sent without noreply flag", "msg: %+v", msg) } // unmarshal body to extract deltaSrc @@ -646,7 +708,7 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { var old db.Body doc, ok := btcr.getClientDoc(docID) if !ok { - require.FailNow(btc.TB(), "docID %q not found in _seqFromDocID", docID) + require.FailNowf(btc.TB(), "doc not found", "docID %q not found", docID) return } var deltaSrcVersion DocVersion @@ -794,24 +856,61 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { doc.lock.Lock() defer doc.lock.Unlock() - var newVersion DocVersion + var incomingVersion DocVersion + var versionToWrite DocVersion var hlv db.HybridLogicalVector if btc.UseHLV() { + var incomingHLV *db.HybridLogicalVector if revHistory != "" { - existingVersion, _, err := db.ExtractHLVFromBlipMessage(revHistory) - require.NoError(btr.TB(), err, "error extracting HLV %q: %v", revHistory, err) - hlv = *existingVersion + incomingHLV, err = db.FromHistoryForHLV(revHistory) + require.NoError(btr.TB(), err, "error extracting HLV history %q: %v", revHistory, err) + hlv = *incomingHLV } - v, err := db.ParseVersion(revID) + incomingCV, err := db.ParseVersion(revID) require.NoError(btr.TB(), err, "error parsing version %q: %v", revID, err) - newVersion = DocVersion{CV: v} - require.NoError(btr.TB(), hlv.AddVersion(v)) + incomingVersion = DocVersion{CV: incomingCV} + + // fetch client's latest version to do conflict check and resolution + latestClientRev, err := doc._latestRev() + require.NoError(btc.TB(), err, "couldn't get latest revision for doc %q", docID) + if latestClientRev != nil { + clientCV := latestClientRev.version.CV + + // safety check - ensure SG is not sending a rev that we already had - ensures changes feed messaging is working correctly to prevent + if clientCV.SourceID == incomingCV.SourceID && clientCV.Value == incomingCV.Value { + require.FailNowf(btc.TB(), "incoming revision is equal to client revision", "incoming revision %v is equal to client revision %v - should've been filtered via changes response before ending up as a rev", incomingCV, clientCV) + } + + // incoming rev older than stored client version and comes from a different source - need to resolve + if incomingCV.Value < clientCV.Value && incomingCV.SourceID != clientCV.SourceID { + btc.TB().Logf("Detected conflict on pull of doc %q (clientCV:%v - incomingCV:%v incomingHLV:%#v)", docID, clientCV, incomingCV, incomingHLV) + switch btc.BlipTesterClientOpts.ConflictResolver { + case ConflictResolverLastWriteWins: + // local wins so write the local body back as a new resolved version (based on incoming HLV) to push + body = latestClientRev.body + v := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.id), Value: uint64(time.Now().UnixNano())} + require.NoError(btc.TB(), hlv.AddVersion(v), "couldn't add incoming HLV into client HLV") + versionToWrite = DocVersion{CV: v} + hlv.SetPreviousVersion(incomingCV.SourceID, incomingCV.Value) + default: + btc.TB().Fatalf("Unknown conflict resolver %q - cannot resolve detected conflict", btc.BlipTesterClientOpts.ConflictResolver) + } + } else { + // no conflict - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} + } + } else { + // no existing rev - accept incoming rev + versionToWrite = DocVersion{CV: incomingCV} + } + require.NoError(btc.TB(), hlv.AddVersion(versionToWrite.CV), "couldn't add new CV into doc HLV") } else { - newVersion = DocVersion{RevTreeID: revID} + versionToWrite = DocVersion{RevTreeID: revID} + incomingVersion = versionToWrite } docRev := clientDocRev{ clientSeq: newClientSeq, - version: newVersion, + version: versionToWrite, HLV: hlv, body: body, message: msg, @@ -835,12 +934,16 @@ func (btr *BlipTesterReplicator) initHandlers(btc *BlipTesterClient) { // store the new sequence for a replaced rev for tests waiting for this specific rev doc._seqsByVersions[replacedVersion] = newClientSeq } - doc._latestServerVersion = newVersion + // store the _incoming_ version - not versionToWrite - since we may have written a resolved conflict which will need pushing back + doc._latestServerVersion = incomingVersion if !msg.NoReply() { response := msg.Response() response.SetBody([]byte(`[]`)) } + + // new sequence written, wake up changes feeds for push + btcr._seqCond.Broadcast() } btr.bt.blipContext.HandlerForProfile[db.MessageGetAttachment] = func(msg *blip.Message) { @@ -943,7 +1046,7 @@ func (btc *BlipTesterCollectionClient) updateLastReplicatedVersion(docID string, defer btc.seqLock.Unlock() doc, ok := btc._getClientDoc(docID) if !ok { - require.FailNow(btc.TB(), "docID %q not found in _seqFromDocID", docID) + require.FailNowf(btc.TB(), "doc not found", "docID %q", docID) return } doc.setLatestServerVersion(version) @@ -954,7 +1057,7 @@ func (btc *BlipTesterCollectionClient) getLastReplicatedVersion(docID string) (v defer btc.seqLock.Unlock() doc, ok := btc._getClientDoc(docID) if !ok { - require.FailNow(btc.TB(), "docID %q not found in _seqFromDocID", docID) + require.FailNowf(btc.TB(), "doc not found", "docID %q", docID) return DocVersion{}, false } doc.lock.RLock() @@ -1007,6 +1110,12 @@ func (btcRunner *BlipTestClientRunner) NewBlipTesterClientOptsWithRT(rt *RestTes if !opts.AllowCreationWithoutBlipTesterClientRunner && !btcRunner.initialisedInsideRunnerCode { require.FailNow(btcRunner.TB(), "must initialise BlipTesterClient inside Run() method") } + if opts.ConflictResolver == "" { + opts.ConflictResolver = ConflictResolverDefault + } + if !opts.ConflictResolver.IsValid() { + require.FailNowf(btcRunner.TB(), "invalid conflict resolver", "invalid conflict resolver %q", opts.ConflictResolver) + } id, err := uuid.NewRandom() require.NoError(btcRunner.TB(), err) if opts.SourceID == "" { @@ -1150,7 +1259,7 @@ func (btcRunner *BlipTestClientRunner) Collection(clientID uint32, collectionNam return collectionClient } } - require.FailNow(btcRunner.clients[clientID].TB(), "Could not find collection %s in BlipTesterClient", collectionName) + require.FailNowf(btcRunner.clients[clientID].TB(), "unknown collection", "Could not find collection %s in BlipTesterClient", collectionName) return nil } @@ -1465,7 +1574,7 @@ func (btc *BlipTesterCollectionClient) StartPullSince(options BlipTesterPullOpti errorDomain := subChangesResponse.Properties["Error-Domain"] errorCode := subChangesResponse.Properties["Error-Code"] if errorDomain != "" && errorCode != "" { - require.FailNowf(btc.TB(), "error %s %s from subChanges with body: %s", errorDomain, errorCode, string(rspBody)) + require.FailNowf(btc.TB(), "error from subchanges", "error %s %s from subChanges with body: %s", errorDomain, errorCode, string(rspBody)) } } @@ -1748,7 +1857,7 @@ func (btc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte // push the stub as-is continue } - require.FailNow(btc.TB(), "couldn't find data or stub property for inline attachment %s:%v", attachmentName, inlineAttachment) + require.FailNowf(btc.TB(), "couldn't find data or stub property for inline attachment", "att name %s:%v", attachmentName, inlineAttachment) } // Transform inline attachment data into metadata @@ -1791,7 +1900,7 @@ func (btc *BlipTesterCollectionClient) GetVersion(docID string, docVersion DocVe rev, ok := doc._revisionsBySeq[revSeq] if !ok { - require.FailNow(btc.TB(), "seq %q for docID %q found but no rev in _seqStore", revSeq, docID) + require.FailNowf(btc.TB(), "no rev seq in _seqStore", "seq %q for docID %q found but no rev in _seqStore", revSeq, docID) return nil, false } diff --git a/topologytest/couchbase_lite_mock_peer_test.go b/topologytest/couchbase_lite_mock_peer_test.go index 05000a4827..7f82b62ee8 100644 --- a/topologytest/couchbase_lite_mock_peer_test.go +++ b/topologytest/couchbase_lite_mock_peer_test.go @@ -65,7 +65,11 @@ func (p *CouchbaseLiteMockPeer) GetDocument(dsName sgbucket.DataStoreName, docID bodyBytes, meta := p.getLatestDocVersion(dsName, docID) require.NotNil(p.TB(), meta, "docID:%s not found on %s", docID, p) var body db.Body - require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body)) + // it's easier if all clients can return consistent bodies for tombstones + // lets just settle on nil, since we still need special handling anyway for `` vs `{}` so unmarshal doesn't barf + if len(bodyBytes) > 0 && string(bodyBytes) != base.EmptyDocument { + require.NoError(p.TB(), base.JSONUnmarshal(bodyBytes, &body)) + } return *meta, body } diff --git a/topologytest/couchbase_server_peer_test.go b/topologytest/couchbase_server_peer_test.go index 0ca143ff3f..ae41ca7682 100644 --- a/topologytest/couchbase_server_peer_test.go +++ b/topologytest/couchbase_server_peer_test.go @@ -342,6 +342,8 @@ func getBodyAndVersion(peer Peer, collection sgbucket.DataStore, docID string) ( require.NoError(peer.TB(), err) // get hlv to construct DocVersion var body db.Body - require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body)) + if len(docBytes) > 0 { + require.NoError(peer.TB(), base.JSONUnmarshal(docBytes, &body)) + } return getDocVersion(docID, peer, cas, xattrs), body } diff --git a/topologytest/hlv_test.go b/topologytest/hlv_test.go index 24f45b1882..20d5a4632e 100644 --- a/topologytest/hlv_test.go +++ b/topologytest/hlv_test.go @@ -15,6 +15,7 @@ import ( "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -67,6 +68,40 @@ func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, p } } +// waitForConvergingVersion waits for the same document version to reach all peers. +func waitForConvergingVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, replications Replications, docID string) { + t.Logf("waiting for converged doc versions across all peers") + if !assert.EventuallyWithT(t, func(c *assert.CollectT) { + for peerAid, peerA := range peers.SortedPeers() { + docMetaA, bodyA := peerA.GetDocument(dsName, docID) + for peerBid, peerB := range peers.SortedPeers() { + if peerAid == peerBid { + continue + } + docMetaB, bodyB := peerB.GetDocument(dsName, docID) + cvA, cvB := docMetaA.CV(t), docMetaB.CV(t) + require.Equalf(c, cvA, cvB, "CV mismatch: %s:%#v != %s:%#v", peerAid, docMetaA, peerBid, docMetaB) + require.Equalf(c, bodyA, bodyB, "body mismatch: %s:%s != %s:%s", peerAid, bodyA, peerBid, bodyB) + } + } + }, totalWaitTime, pollInterval) { + // do if !assert->require pattern so we can delay PrintGlobalDocState evaluation + require.FailNowf(t, "Peers did not converge on version", "Global state for doc %q on all peers:\n%s\nReplications: %s", docID, peers.PrintGlobalDocState(t, dsName, docID), replications) + } +} + +// PrintGlobalDocState returns the current state of a document across all peers, and also logs it on `t`. +func (p Peers) PrintGlobalDocState(t testing.TB, dsName base.ScopeAndCollectionName, docID string) string { + var globalState strings.Builder + for peerName, peer := range p { + docMeta, body := peer.GetDocument(dsName, docID) + globalState.WriteString(fmt.Sprintf("====\npeer(%s)\n----\n%#v\nbody:%v\n", peerName, docMeta, body)) + } + globalStateStr := globalState.String() + t.Logf("Global doc %q state for all peers:\n%s", docID, globalStateStr) + return globalStateStr +} + // removeSyncGatewayBackingPeers will check if there is sync gateway in topology, if so will track the backing CBS // so we can skip creating docs on these peers (avoiding conflicts between docs created on the SGW and cbs) func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { @@ -82,67 +117,43 @@ func removeSyncGatewayBackingPeers(peers map[string]Peer) map[string]bool { return peersToRemove } -// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents, then -// returns the last peer to have a doc created on it -func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) { +// createConflictingDocs will create a doc on each peer of the same doc ID to create conflicting documents. +// It is not known at this stage which write the "winner" will be, since conflict resolution can happen at replication time which may not be LWW, or may be LWW but with a new value. +func createConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) - documentVersion := make([]BodyAndVersion, 0, len(peers)) for peerName, peer := range peers { if backingPeers[peerName] { continue } - if peer.Type() == PeerTypeCouchbaseLite { - // FIXME: Skipping Couchbase Lite tests for multi actor conflicts, CBG-4434 - continue - } docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "create"}`, peerName, topologyDescription)) docVersion := peer.CreateDocument(dsName, docID, docBody) t.Logf("%s - createVersion: %#v", peerName, docVersion.docMeta) - documentVersion = append(documentVersion, docVersion) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } -// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations, then -// returns the last peer to have a doc updated on it. -func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) (lastWrite BodyAndVersion) { +// updateConflictingDocs will update a doc on each peer of the same doc ID to create conflicting document mutations +func updateConflictingDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID, topologyDescription string) { backingPeers := removeSyncGatewayBackingPeers(peers) - var documentVersion []BodyAndVersion for peerName, peer := range peers { if backingPeers[peerName] { continue } docBody := []byte(fmt.Sprintf(`{"activePeer": "%s", "topology": "%s", "action": "update"}`, peerName, topologyDescription)) docVersion := peer.WriteDocument(dsName, docID, docBody) - t.Logf("updateVersion: %#v", docVersion.docMeta) - documentVersion = append(documentVersion, docVersion) + t.Logf("%s - updateVersion: %#v", peerName, docVersion.docMeta) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } -// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions, then -// returns the last peer to have a doc deleted on it -func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) (lastWrite BodyAndVersion) { +// deleteConflictDocs will delete a doc on each peer of the same doc ID to create conflicting document deletions +func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string) { backingPeers := removeSyncGatewayBackingPeers(peers) - var documentVersion []BodyAndVersion for peerName, peer := range peers { if backingPeers[peerName] { continue } deleteVersion := peer.DeleteDocument(dsName, docID) - t.Logf("deleteVersion: %#v", deleteVersion) - documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName}) + t.Logf("%s - deleteVersion: %#v", peerName, deleteVersion) } - index := len(documentVersion) - 1 - lastWrite = documentVersion[index] - - return lastWrite } // getDocID returns a unique doc ID for the test case. Note: when running with Couchbase Server and -count > 1, this will return duplicate IDs for count 2 and higher and they can conflict due to the way bucket pool works. diff --git a/topologytest/multi_actor_conflict_test.go b/topologytest/multi_actor_conflict_test.go index ce2290d40f..7825f2a0ff 100644 --- a/topologytest/multi_actor_conflict_test.go +++ b/topologytest/multi_actor_conflict_test.go @@ -9,7 +9,6 @@ package topologytest import ( - "strings" "testing" ) @@ -24,10 +23,9 @@ func TestMultiActorConflictCreate(t *testing.T) { replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) - + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } @@ -42,24 +40,21 @@ func TestMultiActorConflictCreate(t *testing.T) { // 7. assert that the documents are deleted on all peers and have hlv sources equal to the number of active peers func TestMultiActorConflictUpdate(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { - if strings.Contains(topology.description, "CBL") { - t.Skip("CBL actor can generate conflicts and push replication fails with conflict for doc in blip tester CBL-4267") - } t.Run(topology.description, func(t *testing.T) { collectionName, peers, replications := setupTests(t, topology) replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - docVersion = updateConflictingDocs(t, collectionName, peers, docID, topology.description) + updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } @@ -74,24 +69,21 @@ func TestMultiActorConflictUpdate(t *testing.T) { // 7. assert that the documents are deleted on all peers and have hlv sources equal to the number of active peers func TestMultiActorConflictDelete(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { - if strings.Contains(topology.description, "CBL") { - t.Skip("CBL actor can generate conflicts and push replication fails with conflict for doc in blip tester CBL-4267") - } t.Run(topology.description, func(t *testing.T) { collectionName, peers, replications := setupTests(t, topology) replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - lastWrite := deleteConflictDocs(t, collectionName, peers, docID) + deleteConflictDocs(t, collectionName, peers, docID) replications.Start() - waitForTombstoneVersion(t, collectionName, peers, replications, docID, lastWrite) + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } @@ -110,31 +102,28 @@ func TestMultiActorConflictDelete(t *testing.T) { // 11. assert that the documents are resurrected on all peers and have hlv sources equal to the number of active peers and the document body is equivalent to the last write func TestMultiActorConflictResurrect(t *testing.T) { for _, topology := range append(simpleTopologies, Topologies...) { - if strings.Contains(topology.description, "CBL") { - t.Skip("CBL actor can generate conflicts and push replication fails with conflict for doc in blip tester CBL-4267") - } t.Run(topology.description, func(t *testing.T) { collectionName, peers, replications := setupTests(t, topology) replications.Stop() docID := getDocID(t) - docVersion := createConflictingDocs(t, collectionName, peers, docID, topology.description) + createConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, docVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - lastWrite := deleteConflictDocs(t, collectionName, peers, docID) + deleteConflictDocs(t, collectionName, peers, docID) replications.Start() - waitForTombstoneVersion(t, collectionName, peers, replications, docID, lastWrite) + waitForConvergingVersion(t, collectionName, peers, replications, docID) replications.Stop() - lastWriteVersion := updateConflictingDocs(t, collectionName, peers, docID, topology.description) + updateConflictingDocs(t, collectionName, peers, docID, topology.description) replications.Start() - waitForVersionAndBody(t, collectionName, peers, replications, docID, lastWriteVersion) + waitForConvergingVersion(t, collectionName, peers, replications, docID) }) } } diff --git a/topologytest/sync_gateway_peer_test.go b/topologytest/sync_gateway_peer_test.go index 887047799b..c05665044d 100644 --- a/topologytest/sync_gateway_peer_test.go +++ b/topologytest/sync_gateway_peer_test.go @@ -63,7 +63,11 @@ func (p *SyncGatewayPeer) GetDocument(dsName sgbucket.DataStoreName, docID strin collection, ctx := p.getCollection(dsName) doc, err := collection.GetDocument(ctx, docID, db.DocUnmarshalAll) require.NoError(p.TB(), err) - return DocMetadataFromDocument(doc), doc.Body(ctx) + var body db.Body + if !doc.IsDeleted() { + body = doc.Body(ctx) + } + return DocMetadataFromDocument(doc), body } // CreateDocument creates a document on the peer. The test will fail if the document already exists. diff --git a/topologytest/version_test.go b/topologytest/version_test.go index 45ae1f2e5e..387d005053 100644 --- a/topologytest/version_test.go +++ b/topologytest/version_test.go @@ -69,7 +69,7 @@ func DocMetadataFromDocument(doc *db.Document) DocMetadata { } func (v DocMetadata) GoString() string { - return fmt.Sprintf("DocMetadata{\nDocID:%s\n\tRevTreeID:%s\n\tHLV:%+v\n\tMou:%+v\n\tCas:%d\n\tImplicitHLV:%+v\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV) + return fmt.Sprintf("DocMetadata{\n\tDocID: %q,\n\tRevTreeID:%q,\n\tHLV:%+v,\n\tMou:%+v,\n\tCas:%d,\n\tImplicitHLV:%+v,\n}", v.DocID, v.RevTreeID, v.HLV, v.Mou, v.Cas, v.ImplicitHLV) } // DocMetadataFromDocVersion returns metadata DocVersion from the given document and version.