diff --git a/pkg/kv/txn_coord_sender_test.go b/pkg/kv/txn_coord_sender_test.go index c52875b1f2df..53110847d305 100644 --- a/pkg/kv/txn_coord_sender_test.go +++ b/pkg/kv/txn_coord_sender_test.go @@ -20,10 +20,12 @@ import ( "fmt" "reflect" "strconv" + "strings" "sync/atomic" "testing" "time" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -2382,3 +2384,97 @@ func TestAnchorKey(t *testing.T) { t.Fatal(err) } } + +// TestCommitTurnedToRollback tests that the TxnCoordSender (or, rather, one of +// the interceptors) turns a commit into a rollback in situations where a txn +// has performed writes at old epochs but no writes at the current epoch. See +// the comment in txnHeartbeat about why this is needed. +// We check that the transformation happened (by looking at a trace) and that +// other things (e.g. the proto status) look like a committed transaction even +// though we technically performed a rollback. +func TestCommitTurnedToRollback(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + s, metrics, cleanupFn := setupMetricsTest(t) + defer cleanupFn() + + readKey := []byte("read") + writeKey := []byte("write") + value := []byte("value") + + // Start a transaction and read a key that we're going to modify outside the + // txn. This ensures that refreshing the txn will not succeed, so a restart + // will be necessary. + txn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn) + if _, err := txn.Get(ctx, readKey); err != nil { + t.Fatal(err) + } + + // Write the read key outside of the + // transaction, at a higher timestamp, which will necessitate a + // txn restart when the original read key span is updated. + if err := s.DB.Put(ctx, readKey, value); err != nil { + t.Fatal(err) + } + + // Outside of the transaction, read the same key as will be + // written within the transaction. This means that future + // attempts to write will forward the txn timestamp. + if _, err := s.DB.Get(ctx, writeKey); err != nil { + t.Fatal(err) + } + + // This put will lay down an intent, txn timestamp will be bumped. + if err := txn.Put(ctx, writeKey, value); err != nil { + t.Fatal(err) + } + proto := txn.Serialize() + if !proto.OrigTimestamp.Less(proto.Timestamp) { + t.Errorf("expected timestamp to increase: %s", proto) + } + + // Attempt to commit, expect a retriable error. + err := txn.Commit(ctx) + assertTransactionRetryError(t, err) + + // Attempt to commit again, at the next epoch. This should succeed, and the + // commit should be turned to a rollback. + tr := tracing.NewTracer() + sp := tr.StartSpan("test", tracing.Recordable) + tracing.StartRecording(sp, tracing.SingleNodeRecording) + commitCtx := opentracing.ContextWithSpan(ctx, sp) + err = txn.Commit(commitCtx) + sp.Finish() + if err != nil { + t.Fatal(err) + } + + // Check that the commit metric has been incremented. + checkTxnMetrics(t, metrics, "commit turned to rollback", + 1 /* commits */, 0, 0, 0, 1 /* restarts */) + + // Check that the proto's status is the expected one for a commit. + if s := txn.Serialize().Status; s != roachpb.COMMITTED { + t.Fatalf("expected COMMITTED status, got: %s", s) + } + + // Look for a specific log message indicating that this test is not fooling + // itself and indeed we transformed a commit to a rollback. + var found bool + for _, recSp := range tracing.GetRecording(sp) { + msg := "" + for _, l := range recSp.Logs { + for _, f := range l.Fields { + msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value) + } + } + if strings.Contains(msg, turningCommitToRollbackMsg) { + found = true + break + } + } + if !found { + t.Fatalf("didn't find trace message: %s", turningCommitToRollbackMsg) + } +} diff --git a/pkg/kv/txn_interceptor_heartbeat.go b/pkg/kv/txn_interceptor_heartbeat.go index a665cca068d5..d695fa3bfd14 100644 --- a/pkg/kv/txn_interceptor_heartbeat.go +++ b/pkg/kv/txn_interceptor_heartbeat.go @@ -27,6 +27,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" ) +const ( + turningCommitToRollbackMsg string = "Turning commit to rollback. All writes are part of old epochs." +) + // txnHeartbeat is a txnInterceptor in charge of the txn's heartbeat loop. // The heartbeat loop is started upon the first write. txnHeartbeat is also in // charge of prepending a BeginTransaction to the first write batch and possibly @@ -187,20 +191,22 @@ func (h *txnHeartbeat) SendLocked( // See if we can elide an EndTxn. We can elide it for read-only transactions. lastIndex := int32(len(ba.Requests) - 1) var elideEndTxn bool + var commitTurnedToRollback bool if haveEndTxn { // Are we writing now or have we written in the past? elideEndTxn = !h.mu.everSentBeginTxn if elideEndTxn { ba.Requests = ba.Requests[:lastIndex] - } else { + } else if etReq.Commit { // If all the writes were part of old epochs, we can turn the commit into // a rollback. Besides the rollback being potentially cheaper, this // transformation is important in situations where it's unclear if the txn // record exist: if it doesn't, then a commit would return a // TransactionStatusError where a rollback returns success. if h.mu.needBeginTxn { - log.VEventf(ctx, 2, "Turning commit in rollback. All writes are part of old epochs.") + log.VEventf(ctx, 2, turningCommitToRollbackMsg) etReq.Commit = false + commitTurnedToRollback = true } } } @@ -268,6 +274,15 @@ func (h *txnHeartbeat) SendLocked( resp := &roachpb.EndTransactionResponse{} resp.Txn = br.Txn br.Add(resp) + } else if commitTurnedToRollback { + // If we transformed a commit into a rollback, flip the status so that it + // looks like a successful commit to the higher layers. In particular, the + // SQL module looks at this status and wants it to be COMMITTED after a "1pc + // planNode" runs. + // + // Note: if we sent an EndTransaction and got back a successful response, we + // expect br.Txn to be filled. + br.Txn.Status = roachpb.COMMITTED } return br, nil