Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: do not mutate ba.Requests in optimizePuts and executeWriteBatch #10419

Merged
merged 1 commit into from
Nov 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -3341,11 +3341,16 @@ func isOnePhaseCommit(ba roachpb.BatchRequest) bool {
// range of keys being written is empty. If so, then the run can be
// set to put "blindly", meaning no iterator need be used to read
// existing values during the MVCC write.
func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinctSpans bool) {
// The caller should use the returned slice (which is either equal to
// the input slice, or has been shallow-copied appropriately to avoid
// mutating the original requests).
func optimizePuts(
batch engine.ReadWriter, origReqs []roachpb.RequestUnion, distinctSpans bool,
) []roachpb.RequestUnion {
var minKey, maxKey roachpb.Key
var unique map[string]struct{}
if !distinctSpans {
unique = make(map[string]struct{}, len(reqs))
unique = make(map[string]struct{}, len(origReqs))
}
// Returns false on occurrence of a duplicate key.
maybeAddPut := func(key roachpb.Key) bool {
Expand All @@ -3365,7 +3370,8 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
return true
}

for i, r := range reqs {
firstUnoptimizedIndex := len(origReqs)
for i, r := range origReqs {
switch t := r.GetInner().(type) {
case *roachpb.PutRequest:
if maybeAddPut(t.Key) {
Expand All @@ -3376,12 +3382,12 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
continue
}
}
reqs = reqs[:i]
firstUnoptimizedIndex = i
break
}

if len(reqs) < optimizePutThreshold { // don't bother if below this threshold
return
if firstUnoptimizedIndex < optimizePutThreshold { // don't bother if below this threshold
return origReqs
}
iter := batch.NewIterator(false /* total order iterator */)
defer iter.Close()
Expand All @@ -3397,21 +3403,27 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct
}
// Set the prefix of the run which is being written to virgin
// keyspace to "blindly" put values.
for _, r := range reqs {
if iterKey == nil || bytes.Compare(iterKey, r.GetInner().Header().Key) > 0 {
switch t := r.GetInner().(type) {
reqs := append([]roachpb.RequestUnion(nil), origReqs...)
for i := range reqs[:firstUnoptimizedIndex] {
inner := reqs[i].GetInner()
if iterKey == nil || bytes.Compare(iterKey, inner.Header().Key) > 0 {
switch t := inner.(type) {
case *roachpb.PutRequest:
t.Blind = true
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
case *roachpb.ConditionalPutRequest:
t.Blind = true
shallow := *t
shallow.Blind = true
reqs[i].MustSetInner(&shallow)
default:
log.Fatalf(context.TODO(), "unexpected non-put request: %s", t)
}
}
}
return reqs
}

// TODO(tschottdorf): Reliance on mutating `ba.Txn` should be dealt with.
func (r *Replica) executeBatch(
ctx context.Context,
idKey storagebase.CmdIDKey,
Expand All @@ -3437,7 +3449,7 @@ func (r *Replica) executeBatch(

// Optimize any contiguous sequences of put and conditional put ops.
if len(ba.Requests) >= optimizePutThreshold {
optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans)
ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans)
}

// Update the node clock with the serviced request. This maintains a high
Expand All @@ -3450,6 +3462,14 @@ func (r *Replica) executeBatch(
return nil, ProposalData{}, roachpb.NewErrorWithTxn(err, ba.Header.Txn)
}

// Create a shallow clone of the transaction. We only modify a few
// non-pointer fields (BatchIndex, WriteTooOld, Timestamp), so this saves
// a few allocs.
if ba.Txn != nil {
txnShallow := *ba.Txn
ba.Txn = &txnShallow
}

var pd ProposalData
for index, union := range ba.Requests {
// Execute the command.
Expand Down
53 changes: 52 additions & 1 deletion pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,24 @@ func TestOptimizePuts(t *testing.T) {
for _, r := range c.reqs {
batch.Add(r)
}
optimizePuts(tc.engine, batch.Requests, false)
// Make a deep clone of the requests slice. We need a deep clone
// because the regression which is prevented here changed data on the
// individual requests, and not the slice.
goldenRequests := append([]roachpb.RequestUnion(nil), batch.Requests...)
for i := range goldenRequests {
clone := protoutil.Clone(goldenRequests[i].GetInner()).(roachpb.Request)
goldenRequests[i].MustSetInner(clone)
}
// Save the original slice, allowing us to assert that it doesn't
// change when it is passed to optimizePuts.
oldRequests := batch.Requests
batch.Requests = optimizePuts(tc.engine, batch.Requests, false)
if !reflect.DeepEqual(goldenRequests, oldRequests) {
t.Fatalf("%d: optimizePuts mutated the original request slice: %s",
i, pretty.Diff(goldenRequests, oldRequests),
)
}

blind := []bool{}
for _, r := range batch.Requests {
switch t := r.GetInner().(type) {
Expand Down Expand Up @@ -6301,3 +6318,37 @@ func TestReplicaTimestampCacheBumpNotLost(t *testing.T) {
)
}
}

func TestReplicaEvaluationNotTxnMutation(t *testing.T) {
defer leaktest.AfterTest(t)()

tc := testContext{}
tc.Start(t)
defer tc.Stop()

ctx := tc.rng.AnnotateCtx(context.TODO())
key := keys.LocalMax

txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock())
origTxn := txn.Clone()

var ba roachpb.BatchRequest
ba.Txn = txn
ba.Timestamp = txn.Timestamp
txnPut := putArgs(key, []byte("foo"))
// Add two puts (the second one gets BatchIndex 1, which was a failure mode
// observed when this test was written and the failure fixed). Originally
// observed in #10137, where this became relevant (before that, evaluation
// happened downstream of Raft, so a serialization pass always took place).
ba.Add(&txnPut)
ba.Add(&txnPut)

batch, _, _, _, pErr := tc.rng.executeWriteBatch(ctx, makeIDKey(), ba)
defer batch.Close()
if pErr != nil {
t.Fatal(pErr)
}
if !reflect.DeepEqual(&origTxn, txn) {
t.Fatalf("transaction was mutated during evaluation: %s", pretty.Diff(&origTxn, txn))
}
}