Skip to content

Commit

Permalink
Merge #28911
Browse files Browse the repository at this point in the history
28911: release-2.1: kv: lie better about commits that are really rollbacks r=andreimatei a=andreimatei

cc @cockroachdb/release 

Backport #28872

When a client tries to commit a txn that has performed writes at an old
epoch but has only done reads at the current epoch, one of the
TxnCoordSender interceptors turns the commit into a rollback (for
reasons described in the code).
This patch completes that interceptor's lie by updating the txn status
upon success to COMMITTED instead of ABORTED. Since a commit is what the
client asked for, it seems sane to pretend as best we can that that's
what it got. In particular, this is important for the sql module, where
the ConnExecutor looks at the txn proto's status to discriminate between
cases where a "1pc planNode" already committed an implicit txn versus
situations where it needs to commit it itself. This was causing the
executor to think the txn was not committed and to attempt to commit
again, which resulted in an error.
I don't know if we like the ConnExecutor looking at the proto status,
but I'll leave that alone.

Fixes #28554
Fixes #28796

Release note: None

Co-authored-by: Andrei Matei <[email protected]>
  • Loading branch information
craig[bot] and andreimatei committed Aug 21, 2018
2 parents 8ff17b6 + 65d0f33 commit dab7a98
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 53 deletions.
188 changes: 137 additions & 51 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1171,63 +1173,53 @@ func TestTxnRestartCount(t *testing.T) {
value := []byte("value")
ctx := context.Background()

for _, expRestart := range []bool{true, false} {
t.Run(fmt.Sprintf("expected restart:%t", expRestart), func(t *testing.T) {
s, metrics, cleanupFn := setupMetricsTest(t)
defer cleanupFn()
s, metrics, cleanupFn := setupMetricsTest(t)
defer cleanupFn()

// Start a transaction and do a GET. This forces a timestamp to be
// chosen for the transaction.
txn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if _, err := txn.Get(ctx, readKey); err != nil {
t.Fatal(err)
}
// Start a transaction and read a key that we're going to modify outside the
// txn. This ensures that refreshing the txn will not succeed, so a restart
// will be necessary.
txn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if _, err := txn.Get(ctx, readKey); err != nil {
t.Fatal(err)
}

// If expRestart is true, write the read key outside of the
// transaction, at a higher timestamp, which will necessitate a
// txn restart when the original read key span is updated.
if expRestart {
if err := s.DB.Put(ctx, readKey, value); err != nil {
t.Fatal(err)
}
}
// Write the read key outside of the transaction, at a higher timestamp, which
// will necessitate a txn restart when the original read key span is updated.
if err := s.DB.Put(ctx, readKey, value); err != nil {
t.Fatal(err)
}

// Outside of the transaction, read the same key as will be
// written within the transaction. This means that future
// attempts to write will forward the txn timestamp.
if _, err := s.DB.Get(ctx, writeKey); err != nil {
t.Fatal(err)
}
// Outside of the transaction, read the same key as will be
// written within the transaction. This means that future
// attempts to write will forward the txn timestamp.
if _, err := s.DB.Get(ctx, writeKey); err != nil {
t.Fatal(err)
}

// This put will lay down an intent, txn timestamp will increase
// beyond OrigTimestamp.
if err := txn.Put(ctx, writeKey, value); err != nil {
t.Fatal(err)
}
proto := txn.Serialize()
if !proto.OrigTimestamp.Less(proto.Timestamp) {
t.Errorf("expected timestamp to increase: %s", proto)
}
// This put will lay down an intent, txn timestamp will increase
// beyond OrigTimestamp.
if err := txn.Put(ctx, writeKey, value); err != nil {
t.Fatal(err)
}
proto := txn.Serialize()
if !proto.OrigTimestamp.Less(proto.Timestamp) {
t.Errorf("expected timestamp to increase: %s", proto)
}

// Wait for heartbeat to start.
tc := txn.Sender().(*TxnCoordSender)
testutils.SucceedsSoon(t, func() error {
if !tc.isTracking() {
return errors.New("expected heartbeat to start")
}
return nil
})
// Wait for heartbeat to start.
tc := txn.Sender().(*TxnCoordSender)
testutils.SucceedsSoon(t, func() error {
if !tc.isTracking() {
return errors.New("expected heartbeat to start")
}
return nil
})

// Commit (should cause restart metric to increase).
err := txn.CommitOrCleanup(ctx)
if expRestart {
assertTransactionRetryError(t, err)
checkTxnMetrics(t, metrics, "restart txn", 0, 0, 0, 1, 1)
} else if err != nil {
t.Fatalf("expected no restart; got %s", err)
}
})
}
// Commit (should cause restart metric to increase).
err := txn.CommitOrCleanup(ctx)
assertTransactionRetryError(t, err)
checkTxnMetrics(t, metrics, "restart txn", 0, 0, 0, 1 /* aborts */, 1 /* restarts */)
}

func TestTxnDurations(t *testing.T) {
Expand Down Expand Up @@ -2382,3 +2374,97 @@ func TestAnchorKey(t *testing.T) {
t.Fatal(err)
}
}

// TestCommitTurnedToRollback tests that the TxnCoordSender (or, rather, one of
// the interceptors) turns a commit into a rollback in situations where a txn
// has performed writes at old epochs but no writes at the current epoch. See
// the comment in txnHeartbeat about why this is needed.
// We check that the transformation happened (by looking at a trace) and that
// other things (e.g. the proto status) look like a committed transaction even
// though we technically performed a rollback.
func TestCommitTurnedToRollback(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
s, metrics, cleanupFn := setupMetricsTest(t)
defer cleanupFn()

readKey := []byte("read")
writeKey := []byte("write")
value := []byte("value")

// Start a transaction and read a key that we're going to modify outside the
// txn. This ensures that refreshing the txn will not succeed, so a restart
// will be necessary.
txn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn)
if _, err := txn.Get(ctx, readKey); err != nil {
t.Fatal(err)
}

// Write the read key outside of the
// transaction, at a higher timestamp, which will necessitate a
// txn restart when the original read key span is updated.
if err := s.DB.Put(ctx, readKey, value); err != nil {
t.Fatal(err)
}

// Outside of the transaction, read the same key as will be
// written within the transaction. This means that future
// attempts to write will forward the txn timestamp.
if _, err := s.DB.Get(ctx, writeKey); err != nil {
t.Fatal(err)
}

// This put will lay down an intent, txn timestamp will be bumped.
if err := txn.Put(ctx, writeKey, value); err != nil {
t.Fatal(err)
}
proto := txn.Serialize()
if !proto.OrigTimestamp.Less(proto.Timestamp) {
t.Errorf("expected timestamp to increase: %s", proto)
}

// Attempt to commit, expect a retriable error.
err := txn.Commit(ctx)
assertTransactionRetryError(t, err)

// Attempt to commit again, at the next epoch. This should succeed, and the
// commit should be turned to a rollback.
tr := tracing.NewTracer()
sp := tr.StartSpan("test", tracing.Recordable)
tracing.StartRecording(sp, tracing.SingleNodeRecording)
commitCtx := opentracing.ContextWithSpan(ctx, sp)
err = txn.Commit(commitCtx)
sp.Finish()
if err != nil {
t.Fatal(err)
}

// Check that the commit metric has been incremented.
checkTxnMetrics(t, metrics, "commit turned to rollback",
1 /* commits */, 0, 0, 0, 1 /* restarts */)

// Check that the proto's status is the expected one for a commit.
if s := txn.Serialize().Status; s != roachpb.COMMITTED {
t.Fatalf("expected COMMITTED status, got: %s", s)
}

// Look for a specific log message indicating that this test is not fooling
// itself and indeed we transformed a commit to a rollback.
var found bool
for _, recSp := range tracing.GetRecording(sp) {
msg := ""
for _, l := range recSp.Logs {
for _, f := range l.Fields {
msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value)
}
}
if strings.Contains(msg, turningCommitToRollbackMsg) {
found = true
break
}
}
if !found {
t.Fatalf("didn't find trace message: %s", turningCommitToRollbackMsg)
}
}
24 changes: 22 additions & 2 deletions pkg/kv/txn_interceptor_heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
)

const (
turningCommitToRollbackMsg string = "Turning commit to rollback. All writes are part of old epochs."
)

// txnHeartbeat is a txnInterceptor in charge of the txn's heartbeat loop.
// The heartbeat loop is started upon the first write. txnHeartbeat is also in
// charge of prepending a BeginTransaction to the first write batch and possibly
Expand Down Expand Up @@ -187,20 +191,22 @@ func (h *txnHeartbeat) SendLocked(
// See if we can elide an EndTxn. We can elide it for read-only transactions.
lastIndex := int32(len(ba.Requests) - 1)
var elideEndTxn bool
var commitTurnedToRollback bool
if haveEndTxn {
// Are we writing now or have we written in the past?
elideEndTxn = !h.mu.everSentBeginTxn
if elideEndTxn {
ba.Requests = ba.Requests[:lastIndex]
} else {
} else if etReq.Commit {
// If all the writes were part of old epochs, we can turn the commit into
// a rollback. Besides the rollback being potentially cheaper, this
// transformation is important in situations where it's unclear if the txn
// record exist: if it doesn't, then a commit would return a
// TransactionStatusError where a rollback returns success.
if h.mu.needBeginTxn {
log.VEventf(ctx, 2, "Turning commit in rollback. All writes are part of old epochs.")
log.VEventf(ctx, 2, turningCommitToRollbackMsg)
etReq.Commit = false
commitTurnedToRollback = true
}
}
}
Expand Down Expand Up @@ -262,12 +268,26 @@ func (h *txnHeartbeat) SendLocked(
if br.Txn == nil {
txn := ba.Txn.Clone()
br.Txn = &txn
} else {
clone := br.Txn.Clone()
br.Txn = &clone
}
br.Txn.Status = status
// Synthesize an EndTransactionResponse.
resp := &roachpb.EndTransactionResponse{}
resp.Txn = br.Txn
br.Add(resp)
} else if commitTurnedToRollback {
// If we transformed a commit into a rollback, flip the status so that it
// looks like a successful commit to the higher layers. In particular, the
// SQL module looks at this status and wants it to be COMMITTED after a "1pc
// planNode" runs.
//
// Note: if we sent an EndTransaction and got back a successful response, we
// expect br.Txn to be filled.
clone := br.Txn.Clone()
br.Txn = &clone
br.Txn.Status = roachpb.COMMITTED
}

return br, nil
Expand Down

0 comments on commit dab7a98

Please sign in to comment.