Skip to content

Commit

Permalink
storage: don't mutate caller's transaction in executeWriteBatch
Browse files Browse the repository at this point in the history
With proposer-evaluated Raft, doing so would mutate the caller's transaction
proto, with unclear (but definitely bad) implications.

Do not mutate ba.Requests in optimizePuts.
This fixes a source of data races in the experimental proposer-evaluated KV
PR #10327.

Updated `TestOptimizePuts` so that prior to the fix, it failed with

```
--- FAIL: TestOptimizePuts (0.01s)
       	replica_test.go:1522: 2: optimizePuts mutated the original request
        slice: [[0].Put.Blind: false != true [1].Put.Blind: false != true
        [2].Put.Blind: false != true [3].Put.Blind: false != true
        [4].Put.Blind: false != true [5].Put.Blind: false != true
        [6].Put.Blind: false != true [7].Put.Blind: false != true
        [8].Put.Blind: false != true [9].Put.Blind: false != true]
```
  • Loading branch information
tbg committed Nov 2, 2016
1 parent c189c5a commit ab87217
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 14 deletions.
46 changes: 33 additions & 13 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3341,11 +3341,16 @@ func isOnePhaseCommit(ba roachpb.BatchRequest) bool {
// range of keys being written is empty. If so, then the run can be
// set to put "blindly", meaning no iterator need be used to read
// existing values during the MVCC write.
func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinctSpans bool) {
// The caller should use the returned slice (which is either equal to
// the input slice, or has been shallow-copied appropriately to avoid
// mutating the original requests).
func optimizePuts(
batch engine.ReadWriter, origReqs []roachpb.RequestUnion, distinctSpans bool,
) []roachpb.RequestUnion {
var minKey, maxKey roachpb.Key
var unique map[string]struct{}
if !distinctSpans {
unique = make(map[string]struct{}, len(reqs))
unique = make(map[string]struct{}, len(origReqs))
}
// Returns false on occurrence of a duplicate key.
maybeAddPut := func(key roachpb.Key) bool {
Expand All @@ -3365,7 +3370,8 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
return true
}

for i, r := range reqs {
firstUnoptimizedIndex := len(origReqs)
for i, r := range origReqs {
switch t := r.GetInner().(type) {
case *roachpb.PutRequest:
if maybeAddPut(t.Key) {
Expand All @@ -3376,12 +3382,12 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
continue
}
}
reqs = reqs[:i]
firstUnoptimizedIndex = i
break
}

if len(reqs) < optimizePutThreshold { // don't bother if below this threshold
return
if firstUnoptimizedIndex < optimizePutThreshold { // don't bother if below this threshold
return origReqs
}
iter := batch.NewIterator(false /* total order iterator */)
defer iter.Close()
Expand All @@ -3397,21 +3403,27 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
}
// Set the prefix of the run which is being written to virgin
// keyspace to "blindly" put values.
for _, r := range reqs {
if iterKey == nil || bytes.Compare(iterKey, r.GetInner().Header().Key) > 0 {
switch t := r.GetInner().(type) {
reqs := append([]roachpb.RequestUnion(nil), origReqs...)
for i := range reqs[:firstUnoptimizedIndex] {
inner := reqs[i].GetInner()
if iterKey == nil || bytes.Compare(iterKey, inner.Header().Key) > 0 {
switch t := inner.(type) {
case *roachpb.PutRequest:
t.Blind = true
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
case *roachpb.ConditionalPutRequest:
t.Blind = true
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
default:
log.Fatalf(context.TODO(), "unexpected non-put request: %s", t)
}
}
}
return reqs
}

// TODO(tschottdorf): Reliance on mutating `ba.Txn` should be dealt with.
func (r *Replica) executeBatch(
ctx context.Context,
idKey storagebase.CmdIDKey,
Expand All @@ -3437,7 +3449,7 @@ func (r *Replica) executeBatch(

// Optimize any contiguous sequences of put and conditional put ops.
if len(ba.Requests) >= optimizePutThreshold {
optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans)
ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans)
}

// Update the node clock with the serviced request. This maintains a high
Expand All @@ -3450,6 +3462,14 @@ func (r *Replica) executeBatch(
return nil, ProposalData{}, roachpb.NewErrorWithTxn(err, ba.Header.Txn)
}

// Create a shallow clone of the transaction. We only modify a few
// non-pointer fields (BatchIndex, WriteTooOld, Timestamp), so this saves
// a few allocs.
if ba.Txn != nil {
txnShallow := *ba.Txn
ba.Txn = &txnShallow
}

var pd ProposalData
for index, union := range ba.Requests {
// Execute the command.
Expand Down
53 changes: 52 additions & 1 deletion pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,24 @@ func TestOptimizePuts(t *testing.T) {
for _, r := range c.reqs {
batch.Add(r)
}
optimizePuts(tc.engine, batch.Requests, false)
// Make a deep clone of the requests slice. We need a deep clone
// because the regression which is prevented here changed data on the
// individual requests, and not the slice.
goldenRequests := append([]roachpb.RequestUnion(nil), batch.Requests...)
for i := range goldenRequests {
clone := protoutil.Clone(goldenRequests[i].GetInner()).(roachpb.Request)
goldenRequests[i].MustSetInner(clone)
}
// Save the original slice, allowing us to assert that it doesn't
// change when it is passed to optimizePuts.
oldRequests := batch.Requests
batch.Requests = optimizePuts(tc.engine, batch.Requests, false)
if !reflect.DeepEqual(goldenRequests, oldRequests) {
t.Fatalf("%d: optimizePuts mutated the original request slice: %s",
i, pretty.Diff(goldenRequests, oldRequests),
)
}

blind := []bool{}
for _, r := range batch.Requests {
switch t := r.GetInner().(type) {
Expand Down Expand Up @@ -6301,3 +6318,37 @@ func TestReplicaTimestampCacheBumpNotLost(t *testing.T) {
)
}
}

func TestReplicaEvaluationNotTxnMutation(t *testing.T) {
defer leaktest.AfterTest(t)()

tc := testContext{}
tc.Start(t)
defer tc.Stop()

ctx := tc.rng.AnnotateCtx(context.TODO())
key := keys.LocalMax

txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock())
origTxn := txn.Clone()

var ba roachpb.BatchRequest
ba.Txn = txn
ba.Timestamp = txn.Timestamp
txnPut := putArgs(key, []byte("foo"))
// Add two puts (the second one gets BatchIndex 1, which was a failure mode
// observed when this test was written and the failure fixed). Originally
// observed in #10137, where this became relevant (before that, evaluation
// happened downstream of Raft, so a serialization pass always took place).
ba.Add(&txnPut)
ba.Add(&txnPut)

batch, _, _, _, pErr := tc.rng.executeWriteBatch(ctx, makeIDKey(), ba)
defer batch.Close()
if pErr != nil {
t.Fatal(pErr)
}
if !reflect.DeepEqual(&origTxn, txn) {
t.Fatalf("transaction was mutated during evaluation: %s", pretty.Diff(&origTxn, txn))
}
}

0 comments on commit ab87217

Please sign in to comment.