diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index acb9878e43fc..8355587faa49 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -211,6 +211,10 @@ type txnPipeliner struct { // to have succeeded. They will need to be proven before the transaction // can commit. ifWrites inFlightWriteSet + // The in-flight writes chain index is used to uniquely identify calls to + // chainToInFlightWrites, so that each call can limit itself to adding a + // single QueryIntent request to the batch per overlapping in-flight write. + ifWritesChainIndex int64 // The transaction's lock footprint contains spans where locks (replicated // and unreplicated) have been acquired at some point by the transaction. // The span set contains spans encompassing the keys from all intent writes @@ -504,19 +508,25 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch return ba } + // We may need to add QueryIntent requests to the batch. These variables are + // used to implement a copy-on-write scheme. forked := false oldReqs := ba.Requests - // TODO(nvanbenschoten): go 1.11 includes an optimization to quickly clear - // out an entire map. That might make it cost effective to maintain a single - // chainedKeys map between calls to this function. - var chainedKeys map[string]struct{} + + // We only want to add a single QueryIntent request to the BatchRequest per + // overlapping in-flight write. These counters allow us to accomplish this + // without a separate data structure. + tp.ifWritesChainIndex++ + chainIndex := tp.ifWritesChainIndex + chainCount := 0 + for i, ru := range oldReqs { req := ru.GetInner() // If we've chained onto all the in-flight writes (ifWrites.len() == - // len(chainedKeys)), we don't need to pile on more QueryIntents. So, only + // chainCount), we don't need to pile on more QueryIntents. So, only // do this work if that's not the case. - if tp.ifWrites.len() > len(chainedKeys) { + if tp.ifWrites.len() > chainCount { // For each conflicting in-flight write, add a QueryIntent request // to the batch to assert that it has succeeded and "chain" onto it. writeIter := func(w *inFlightWrite) { @@ -528,7 +538,7 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch forked = true } - if _, ok := chainedKeys[string(w.Key)]; !ok { + if w.chainIndex != chainIndex { // The write has not already been chained onto by an earlier // request in this batch. Add a QueryIntent request to the // batch (before the conflicting request) to ensure that we @@ -544,11 +554,12 @@ func (tp *txnPipeliner) chainToInFlightWrites(ba *kvpb.BatchRequest) *kvpb.Batch }) // Record that the key has been chained onto at least once - // in this batch so that we don't chain onto it again. - if chainedKeys == nil { - chainedKeys = make(map[string]struct{}) - } - chainedKeys[string(w.Key)] = struct{}{} + // in this batch so that we don't chain onto it again. If + // we fail to prove the write exists for any reason, future + // requests will use a different chainIndex and will try to + // prove the write again. + w.chainIndex = chainIndex + chainCount++ } } @@ -892,6 +903,10 @@ func (tp *txnPipeliner) hasAcquiredLocks() bool { // number. type inFlightWrite struct { roachpb.SequencedWrite + // chainIndex is used to avoid chaining on to the same in-flight write + // multiple times in the same batch. Each index uniquely identifies a + // call to txnPipeliner.chainToInFlightWrites. + chainIndex int64 } // Less implements the btree.Item interface. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 5a84659960cd..3d54d7559e63 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -1571,7 +1571,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) { }) require.Equal(t, []inFlightWrite{ - {roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}}, + {SequencedWrite: roachpb.SequencedWrite{Key: roachpb.Key("b"), Sequence: 11}}, }, ifWrites)