From 8b5bafbaadca66f118672f1b89b08fab43a5fa6f Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sat, 8 Jun 2019 16:36:27 -0400 Subject: [PATCH] kv: collect and propogate all transaction state in DistSender on error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes “Bug 2” from https://github.com/cockroachdb/cockroach/issues/36089#issuecomment-499349410. This commit fixes the bug described in the referenced issue where part of a transaction proto update was lost when combining partial batches with some successes and some failures. This was leading to a state where a transaction restarted with a read timestamp below its maximum write timestamp from the previous epoch. Fixing this bug resolves the current issue in #36089. Release note: None --- pkg/kv/dist_sender.go | 20 +++++++++--- pkg/kv/dist_sender_server_test.go | 53 +++++++++++++++++-------------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index 584a682226b0..66b43fc8b175 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -1125,6 +1125,15 @@ func (ds *DistSender) divideAndSendBatchToRanges( if resp.pErr != nil { if pErr == nil { pErr = resp.pErr + // Update the error's transaction with any new information from + // the batch response. This may contain interesting updates if + // the batch was parallelized and part of it succeeded. + pErr.UpdateTxn(br.Txn) + } else { + // Even though we ignore the second error, update the first + // error's transaction with any new information from the + // second error. This may contain interesting updates. + pErr.UpdateTxn(resp.pErr.GetTxn()) } continue } @@ -1139,21 +1148,22 @@ func (ds *DistSender) divideAndSendBatchToRanges( } // Combine the new response with the existing one (including updating - // the headers). + // the headers) if we haven't yet seen an error. if pErr == nil { if err := br.Combine(resp.reply, resp.positions); err != nil { pErr = roachpb.NewError(err) } + } else { + // Update the error's transaction with any new information from + // the batch response. This may contain interesting updates if + // the batch was parallelized and part of it succeeded. + pErr.UpdateTxn(resp.reply.Txn) } } // If we experienced an error, don't neglect to update the error's // attached transaction with any responses which were received. if pErr != nil { - // Update the error's transaction with any new information from - // the batch response. This may contain interesting updates if - // the batch was parallelized and part of it succeeded. - pErr.UpdateTxn(br.Txn) // If this is a write batch with any successful responses, but // we're ultimately returning an error, wrap the error with a // MixedSuccessError. diff --git a/pkg/kv/dist_sender_server_test.go b/pkg/kv/dist_sender_server_test.go index 099c0b83bb83..5459445fd1eb 100644 --- a/pkg/kv/dist_sender_server_test.go +++ b/pkg/kv/dist_sender_server_test.go @@ -1638,31 +1638,34 @@ func TestBadRequest(t *testing.T) { } // TestPropagateTxnOnError verifies that DistSender.Send properly propagates the -// txn data to a next iteration. Use the txn.ObservedTimestamps field to verify -// that. +// txn data to a next iteration. The test uses the txn.ObservedTimestamps field +// to verify that. func TestPropagateTxnOnError(t *testing.T) { defer leaktest.AfterTest(t)() - // Inject this observed timestamp into the part of the batch's response that - // does not result in an error. Even though the batch as a whole results in - // an error, the transaction should still propagate this information. - observedTS := roachpb.ObservedTimestamp{ - NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15}, - } - containsObservedTS := func(txn *roachpb.Transaction) bool { - for _, ts := range txn.ObservedTimestamps { - if ts.Equal(observedTS) { - return true + // Inject these two observed timestamps into the parts of the batch's + // response that does not result in an error. Even though the batch as a + // whole results in an error, the transaction should still propagate this + // information. + ot1 := roachpb.ObservedTimestamp{NodeID: 7, Timestamp: hlc.Timestamp{WallTime: 15}} + ot2 := roachpb.ObservedTimestamp{NodeID: 8, Timestamp: hlc.Timestamp{WallTime: 16}} + containsObservedTSs := func(txn *roachpb.Transaction) bool { + contains := func(ot roachpb.ObservedTimestamp) bool { + for _, ts := range txn.ObservedTimestamps { + if ts.Equal(ot) { + return true + } } + return false } - return false + return contains(ot1) && contains(ot2) } // Set up a filter to so that the first CPut operation will // get a ReadWithinUncertaintyIntervalError and so that the - // Put operation will return with the new observed timestamp. - keyA := roachpb.Key("a") - keyB := roachpb.Key("b") + // Put operations on either side of the CPut will each return + // with the new observed timestamp. + keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") var numCPuts int32 var storeKnobs storage.StoreTestingKnobs storeKnobs.EvalKnobs.TestingEvalFilter = @@ -1671,7 +1674,9 @@ func TestPropagateTxnOnError(t *testing.T) { switch fArgs.Req.(type) { case *roachpb.PutRequest: if k.Equal(keyA) { - fArgs.Hdr.Txn.ObservedTimestamps = append(fArgs.Hdr.Txn.ObservedTimestamps, observedTS) + fArgs.Hdr.Txn.UpdateObservedTimestamp(ot1.NodeID, ot1.Timestamp) + } else if k.Equal(keyC) { + fArgs.Hdr.Txn.UpdateObservedTimestamp(ot2.NodeID, ot2.Timestamp) } case *roachpb.ConditionalPutRequest: if k.Equal(keyB) { @@ -1692,7 +1697,7 @@ func TestPropagateTxnOnError(t *testing.T) { defer s.Stopper().Stop(ctx) db := s.DB() - if err := setupMultipleRanges(ctx, db, "b"); err != nil { + if err := setupMultipleRanges(ctx, db, "b", "c"); err != nil { t.Fatal(err) } @@ -1702,10 +1707,9 @@ func TestPropagateTxnOnError(t *testing.T) { t.Fatal(err) } - // The following txn creates a batch request that is split - // into two requests: Put and CPut. The CPut operation will - // get a ReadWithinUncertaintyIntervalError and the txn will be - // retried. + // The following txn creates a batch request that is split into three + // requests: Put, CPut, and Put. The CPut operation will get a + // ReadWithinUncertaintyIntervalError and the txn will be retried. epoch := 0 if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { // Observe the commit timestamp to prevent refreshes. @@ -1716,13 +1720,13 @@ func TestPropagateTxnOnError(t *testing.T) { if epoch >= 2 { // ObservedTimestamps must contain the timestamp returned from the // Put operation. - if !containsObservedTS(proto) { + if !containsObservedTSs(proto) { t.Errorf("expected observed timestamp, found: %v", proto.ObservedTimestamps) } } else { // ObservedTimestamps must not contain the timestamp returned from // the Put operation. - if containsObservedTS(proto) { + if containsObservedTSs(proto) { t.Errorf("unexpected observed timestamp, found: %v", proto.ObservedTimestamps) } } @@ -1730,6 +1734,7 @@ func TestPropagateTxnOnError(t *testing.T) { b := txn.NewBatch() b.Put(keyA, "val") b.CPut(keyB, "new_val", origVal) + b.Put(keyC, "val2") err := txn.CommitInBatch(ctx, b) if epoch == 1 { if retErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok {