Skip to content

Commit

Permalink
kv: lie better about commits that are really rollbacks
Browse files Browse the repository at this point in the history
When a client tries to commit a txn that has performed writes at an old
epoch but has only done reads at the current epoch, one of the
TxnCoordSender interceptors turns the commit into a rollback (for
reasons described in the code).
This patch completes that interceptor's lie by updating the txn status
upon success to COMMITTED instead of ABORTED. Since a commit is what the
client asked for, it seems sane to pretend as best we can that that's
what it got. In particular, this is important for the sql module, where
the ConnExecutor looks at the txn proto's status to discriminate between
cases where a "1pc planNode" already committed an implicit txn versus
situations where it needs to commit it itself. This was causing the
executor to think the txn was not committed and to attempt to commit
again, which resulted in an error.
I don't know if we like the ConnExecutor looking at the proto status,
but I'll leave that alone.

Fixes cockroachdb#28554
Fixes cockroachdb#28796

Release note: None
  • Loading branch information
andreimatei committed Aug 21, 2018
1 parent 9dddc4c commit 9964dd9
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 2 deletions.
96 changes: 96 additions & 0 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -2382,3 +2384,97 @@ func TestAnchorKey(t *testing.T) {
t.Fatal(err)
}
}

// TestCommitTurnedToRollback tests that the TxnCoordSender (or, rather, one of
// the interceptors) turns a commit into a rollback in situations where a txn
// has performed writes at old epochs but no writes at the current epoch. See
// the comment in txnHeartbeat about why this is needed.
// We check that the transformation happened (by looking at a trace) and that
// other things (e.g. the proto status) look like a committed transaction even
// though we technically performed a rollback.
func TestCommitTurnedToRollback(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, metrics, cleanupFn := setupMetricsTest(t)
defer cleanupFn()

readKey := []byte("read")
writeKey := []byte("write")
value := []byte("value")

// Start a transaction and read a key that we're going to modify outside the
// txn. This ensures that refreshing the txn will not succeed, so a restart
// will be necessary.
txn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if _, err := txn.Get(ctx, readKey); err != nil {
t.Fatal(err)
}

// Write the read key outside of the
// transaction, at a higher timestamp, which will necessitate a
// txn restart when the original read key span is updated.
if err := s.DB.Put(ctx, readKey, value); err != nil {
t.Fatal(err)
}

// Outside of the transaction, read the same key as will be
// written within the transaction. This means that future
// attempts to write will forward the txn timestamp.
if _, err := s.DB.Get(ctx, writeKey); err != nil {
t.Fatal(err)
}

// This put will lay down an intent, txn timestamp will be bumped.
if err := txn.Put(ctx, writeKey, value); err != nil {
t.Fatal(err)
}
proto := txn.Serialize()
if !proto.OrigTimestamp.Less(proto.Timestamp) {
t.Errorf("expected timestamp to increase: %s", proto)
}

// Attempt to commit, expect a retriable error.
err := txn.Commit(ctx)
assertTransactionRetryError(t, err)

// Attempt to commit again, at the next epoch. This should succeed, and the
// commit should be turned to a rollback.
tr := tracing.NewTracer()
sp := tr.StartSpan("test", tracing.Recordable)
tracing.StartRecording(sp, tracing.SingleNodeRecording)
commitCtx := opentracing.ContextWithSpan(ctx, sp)
err = txn.Commit(commitCtx)
sp.Finish()
if err != nil {
t.Fatal(err)
}

// Check that the commit metric has been incremented.
checkTxnMetrics(t, metrics, "commit turned to rollback",
1 /* commits */, 0, 0, 0, 1 /* restarts */)

// Check that the proto's status is the expected one for a commit.
if s := txn.Serialize().Status; s != roachpb.COMMITTED {
t.Fatalf("expected COMMITTED status, got: %s", s)
}

// Look for a specific log message indicating that this test is not fooling
// itself and indeed we transformed a commit to a rollback.
var found bool
for _, recSp := range tracing.GetRecording(sp) {
msg := ""
for _, l := range recSp.Logs {
for _, f := range l.Fields {
msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value)
}
}
if strings.Contains(msg, turningCommitToRollbackMsg) {
found = true
break
}
}
if !found {
t.Fatalf("didn't find trace message: %s", turningCommitToRollbackMsg)
}
}
19 changes: 17 additions & 2 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

const (
turningCommitToRollbackMsg string = "Turning commit to rollback. All writes are part of old epochs."
)

// txnHeartbeat is a txnInterceptor in charge of the txn's heartbeat loop.
// The heartbeat loop is started upon the first write. txnHeartbeat is also in
// charge of prepending a BeginTransaction to the first write batch and possibly
Expand Down Expand Up @@ -187,20 +191,22 @@ func (h *txnHeartbeat) SendLocked(
// See if we can elide an EndTxn. We can elide it for read-only transactions.
lastIndex := int32(len(ba.Requests) - 1)
var elideEndTxn bool
var commitTurnedToRollback bool
if haveEndTxn {
// Are we writing now or have we written in the past?
elideEndTxn = !h.mu.everSentBeginTxn
if elideEndTxn {
ba.Requests = ba.Requests[:lastIndex]
} else {
} else if etReq.Commit {
// If all the writes were part of old epochs, we can turn the commit into
// a rollback. Besides the rollback being potentially cheaper, this
// transformation is important in situations where it's unclear if the txn
// record exist: if it doesn't, then a commit would return a
// TransactionStatusError where a rollback returns success.
if h.mu.needBeginTxn {
log.VEventf(ctx, 2, "Turning commit in rollback. All writes are part of old epochs.")
log.VEventf(ctx, 2, turningCommitToRollbackMsg)
etReq.Commit = false
commitTurnedToRollback = true
}
}
}
Expand Down Expand Up @@ -268,6 +274,15 @@ func (h *txnHeartbeat) SendLocked(
resp := &roachpb.EndTransactionResponse{}
resp.Txn = br.Txn
br.Add(resp)
} else if commitTurnedToRollback {
// If we transformed a commit into a rollback, flip the status so that it
// looks like a successful commit to the higher layers. In particular, the
// SQL module looks at this status and wants it to be COMMITTED after a "1pc
// planNode" runs.
//
// Note: if we sent an EndTransaction and got back a successful response, we
// expect br.Txn to be filled.
br.Txn.Status = roachpb.COMMITTED
}

return br, nil
Expand Down

0 comments on commit 9964dd9

Please sign in to comment.