Skip to content

Commit

Permalink
kv: disallow follower reads for writing transactions
Browse files Browse the repository at this point in the history
Fixes #35812.

To avoid missing its own writes, a transaction must not evaluate a read
on a follower who has nit caught up to at least its current provisional
commit timestamp. We were violating this both at the DistSender level and
at the Replica level.

Because the ability to perform follower reads in a writing transaction is
fairly unimportant and has these known issues, this commit disallows
follower reads for writing transactions.

Release note: None
  • Loading branch information
nvanbenschoten committed Mar 21, 2019
1 parent 63f0658 commit 93a4113
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/followerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest
// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && ba.Txn != nil &&
return ba.IsReadOnly() && ba.Txn != nil && !ba.Txn.IsWriting() &&
canUseFollowerRead(clusterID, st, ba.Txn.OrigTimestamp)
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -79,6 +80,16 @@ func TestCanSendToFollower(t *testing.T) {
if !canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should be able to send an old ro batch to a follower")
}
roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{
Txn: &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Key: []byte("key")},
OrigTimestamp: old,
},
}}
roRWTxnOld.Add(&roachpb.GetRequest{})
if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) {
t.Fatalf("should not be able to send a ro request from a rw txn to a follower")
}
storage.FollowerReadsEnabled.Override(&st.SV, false)
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled")
Expand Down
73 changes: 63 additions & 10 deletions pkg/storage/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -265,6 +266,60 @@ func getTableID(db *gosql.DB, dbName, tableName string) (tableID sqlbase.ID, err
return
}

func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()

if util.RaceEnabled {
// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
t.Skip("skipping under race")
}

ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Verify that we can serve a follower read at a timestamp. Wait if necessary.
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1))
})

// Create a read-only batch and attach a read-write transaction.
rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0)
baRead := makeReadBatchRequestForDesc(desc, ts)
baRead.Txn = &rwTxn

// Send the request to all three replicas. One should succeed and
// the other two should return NotLeaseHolderErrors.
g, ctx := errgroup.WithContext(ctx)
var notLeaseholderErrs int64
for i := range repls {
repl := repls[i]
g.Go(func() (err error) {
if _, pErr := repl.Send(ctx, baRead); pErr != nil {
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
atomic.AddInt64(&notLeaseholderErrs, 1)
return nil
}
return pErr.GetDetail()
}
return nil
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
if a, e := notLeaseholderErrs, int64(2); a != e {
t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a)
}
}

// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
Expand Down Expand Up @@ -451,17 +506,15 @@ func verifyCanReadFromAllRepls(
g, ctx := errgroup.WithContext(ctx)
for i := range repls {
repl := repls[i]
func(r *storage.Replica) {
g.Go(func() (err error) {
var shouldRetry bool
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() {
if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry {
return err
}
g.Go(func() (err error) {
var shouldRetry bool
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() {
if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry {
return err
}
return err
})
}(repls[i])
}
return err
})
}
return g.Wait()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func (r *Replica) canServeFollowerRead(
) *roachpb.Error {
canServeFollowerRead := false
if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch &&
FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch {
(ba.Txn == nil || !ba.Txn.IsWriting()) {

canServeFollowerRead = !r.maxClosed(ctx).Less(ba.Timestamp)
if !canServeFollowerRead {
Expand Down

0 comments on commit 93a4113

Please sign in to comment.