From 9964dd949c7d7ed8cd643fdb81fa80104ff70c1e Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 20 Aug 2018 15:05:39 -0400 Subject: [PATCH] kv: lie better about commits that are really rollbacks When a client tries to commit a txn that has performed writes at an old epoch but has only done reads at the current epoch, one of the TxnCoordSender interceptors turns the commit into a rollback (for reasons described in the code). This patch completes that interceptor's lie by updating the txn status upon success to COMMITTED instead of ABORTED. Since a commit is what the client asked for, it seems sane to pretend as best we can that that's what it got. In particular, this is important for the sql module, where the ConnExecutor looks at the txn proto's status to discriminate between cases where a "1pc planNode" already committed an implicit txn versus situations where it needs to commit it itself. This was causing the executor to think the txn was not committed and to attempt to commit again, which resulted in an error. I don't know if we like the ConnExecutor looking at the proto status, but I'll leave that alone. Fixes #28554 Fixes #28796 Release note: None --- pkg/kv/txn_coord_sender_test.go | 96 +++++++++++++++++++++++++++++ pkg/kv/txn_interceptor_heartbeat.go | 19 +++++- 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/pkg/kv/txn_coord_sender_test.go b/pkg/kv/txn_coord_sender_test.go index c52875b1f2df..53110847d305 100644 --- a/pkg/kv/txn_coord_sender_test.go +++ b/pkg/kv/txn_coord_sender_test.go @@ -20,10 +20,12 @@ import ( "fmt" "reflect" "strconv" + "strings" "sync/atomic" "testing" "time" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -2382,3 +2384,97 @@ func TestAnchorKey(t *testing.T) { t.Fatal(err) } } + +// TestCommitTurnedToRollback tests that the TxnCoordSender (or, rather, one of +// the interceptors) turns a commit into a rollback in situations where a txn +// has performed writes at old epochs but no writes at the current epoch. See +// the comment in txnHeartbeat about why this is needed. +// We check that the transformation happened (by looking at a trace) and that +// other things (e.g. the proto status) look like a committed transaction even +// though we technically performed a rollback. +func TestCommitTurnedToRollback(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, metrics, cleanupFn := setupMetricsTest(t) + defer cleanupFn() + + readKey := []byte("read") + writeKey := []byte("write") + value := []byte("value") + + // Start a transaction and read a key that we're going to modify outside the + // txn. This ensures that refreshing the txn will not succeed, so a restart + // will be necessary. + txn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn) + if _, err := txn.Get(ctx, readKey); err != nil { + t.Fatal(err) + } + + // Write the read key outside of the + // transaction, at a higher timestamp, which will necessitate a + // txn restart when the original read key span is updated. + if err := s.DB.Put(ctx, readKey, value); err != nil { + t.Fatal(err) + } + + // Outside of the transaction, read the same key as will be + // written within the transaction. This means that future + // attempts to write will forward the txn timestamp. + if _, err := s.DB.Get(ctx, writeKey); err != nil { + t.Fatal(err) + } + + // This put will lay down an intent, txn timestamp will be bumped. + if err := txn.Put(ctx, writeKey, value); err != nil { + t.Fatal(err) + } + proto := txn.Serialize() + if !proto.OrigTimestamp.Less(proto.Timestamp) { + t.Errorf("expected timestamp to increase: %s", proto) + } + + // Attempt to commit, expect a retriable error. + err := txn.Commit(ctx) + assertTransactionRetryError(t, err) + + // Attempt to commit again, at the next epoch. This should succeed, and the + // commit should be turned to a rollback. + tr := tracing.NewTracer() + sp := tr.StartSpan("test", tracing.Recordable) + tracing.StartRecording(sp, tracing.SingleNodeRecording) + commitCtx := opentracing.ContextWithSpan(ctx, sp) + err = txn.Commit(commitCtx) + sp.Finish() + if err != nil { + t.Fatal(err) + } + + // Check that the commit metric has been incremented. + checkTxnMetrics(t, metrics, "commit turned to rollback", + 1 /* commits */, 0, 0, 0, 1 /* restarts */) + + // Check that the proto's status is the expected one for a commit. + if s := txn.Serialize().Status; s != roachpb.COMMITTED { + t.Fatalf("expected COMMITTED status, got: %s", s) + } + + // Look for a specific log message indicating that this test is not fooling + // itself and indeed we transformed a commit to a rollback. + var found bool + for _, recSp := range tracing.GetRecording(sp) { + msg := "" + for _, l := range recSp.Logs { + for _, f := range l.Fields { + msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value) + } + } + if strings.Contains(msg, turningCommitToRollbackMsg) { + found = true + break + } + } + if !found { + t.Fatalf("didn't find trace message: %s", turningCommitToRollbackMsg) + } +} diff --git a/pkg/kv/txn_interceptor_heartbeat.go b/pkg/kv/txn_interceptor_heartbeat.go index a665cca068d5..d695fa3bfd14 100644 --- a/pkg/kv/txn_interceptor_heartbeat.go +++ b/pkg/kv/txn_interceptor_heartbeat.go @@ -27,6 +27,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" ) +const ( + turningCommitToRollbackMsg string = "Turning commit to rollback. All writes are part of old epochs." +) + // txnHeartbeat is a txnInterceptor in charge of the txn's heartbeat loop. // The heartbeat loop is started upon the first write. txnHeartbeat is also in // charge of prepending a BeginTransaction to the first write batch and possibly @@ -187,20 +191,22 @@ func (h *txnHeartbeat) SendLocked( // See if we can elide an EndTxn. We can elide it for read-only transactions. lastIndex := int32(len(ba.Requests) - 1) var elideEndTxn bool + var commitTurnedToRollback bool if haveEndTxn { // Are we writing now or have we written in the past? elideEndTxn = !h.mu.everSentBeginTxn if elideEndTxn { ba.Requests = ba.Requests[:lastIndex] - } else { + } else if etReq.Commit { // If all the writes were part of old epochs, we can turn the commit into // a rollback. Besides the rollback being potentially cheaper, this // transformation is important in situations where it's unclear if the txn // record exist: if it doesn't, then a commit would return a // TransactionStatusError where a rollback returns success. if h.mu.needBeginTxn { - log.VEventf(ctx, 2, "Turning commit in rollback. All writes are part of old epochs.") + log.VEventf(ctx, 2, turningCommitToRollbackMsg) etReq.Commit = false + commitTurnedToRollback = true } } } @@ -268,6 +274,15 @@ func (h *txnHeartbeat) SendLocked( resp := &roachpb.EndTransactionResponse{} resp.Txn = br.Txn br.Add(resp) + } else if commitTurnedToRollback { + // If we transformed a commit into a rollback, flip the status so that it + // looks like a successful commit to the higher layers. In particular, the + // SQL module looks at this status and wants it to be COMMITTED after a "1pc + // planNode" runs. + // + // Note: if we sent an EndTransaction and got back a successful response, we + // expect br.Txn to be filled. + br.Txn.Status = roachpb.COMMITTED } return br, nil