Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: disallow follower reads for writing transactions #35969

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/followerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest
// canSendToFollower implements the logic for checking whether a batch request
// may be sent to a follower.
func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool {
return ba.IsReadOnly() && ba.Txn != nil &&
return ba.IsReadOnly() && ba.Txn != nil && !ba.Txn.IsWriting() &&
canUseFollowerRead(clusterID, st, ba.Txn.OrigTimestamp)
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/followerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -79,6 +80,16 @@ func TestCanSendToFollower(t *testing.T) {
if !canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should be able to send an old ro batch to a follower")
}
roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{
Txn: &roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Key: []byte("key")},
OrigTimestamp: old,
},
}}
roRWTxnOld.Add(&roachpb.GetRequest{})
if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) {
t.Fatalf("should not be able to send a ro request from a rw txn to a follower")
}
storage.FollowerReadsEnabled.Override(&st.SV, false)
if canSendToFollower(uuid.MakeV4(), st, roOld) {
t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled")
Expand Down
73 changes: 63 additions & 10 deletions pkg/storage/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
gosql "database/sql"
"fmt"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -265,6 +266,60 @@ func getTableID(db *gosql.DB, dbName, tableName string) (tableID sqlbase.ID, err
return
}

func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()

if util.RaceEnabled {
// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
t.Skip("skipping under race")
}

ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

// Verify that we can serve a follower read at a timestamp. Wait if necessary.
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1))
})

// Create a read-only batch and attach a read-write transaction.
rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0)
baRead := makeReadBatchRequestForDesc(desc, ts)
baRead.Txn = &rwTxn

// Send the request to all three replicas. One should succeed and
// the other two should return NotLeaseHolderErrors.
g, ctx := errgroup.WithContext(ctx)
var notLeaseholderErrs int64
for i := range repls {
repl := repls[i]
g.Go(func() (err error) {
if _, pErr := repl.Send(ctx, baRead); pErr != nil {
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok {
atomic.AddInt64(&notLeaseholderErrs, 1)
return nil
}
return pErr.GetDetail()
}
return nil
})
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
if a, e := notLeaseholderErrs, int64(2); a != e {
t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a)
}
}

// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
Expand Down Expand Up @@ -451,17 +506,15 @@ func verifyCanReadFromAllRepls(
g, ctx := errgroup.WithContext(ctx)
for i := range repls {
repl := repls[i]
func(r *storage.Replica) {
g.Go(func() (err error) {
var shouldRetry bool
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() {
if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry {
return err
}
g.Go(func() (err error) {
var shouldRetry bool
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() {
if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry {
return err
}
return err
})
}(repls[i])
}
return err
})
}
return g.Wait()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func (r *Replica) canServeFollowerRead(
) *roachpb.Error {
canServeFollowerRead := false
if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch &&
FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) &&
lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch {
(ba.Txn == nil || !ba.Txn.IsWriting()) {

canServeFollowerRead = !r.maxClosed(ctx).Less(ba.Timestamp)
if !canServeFollowerRead {
Expand Down