diff --git a/pkg/ccl/followerreadsccl/followerreads.go b/pkg/ccl/followerreadsccl/followerreads.go index 33ab16ccabef..4ae7fd3efed6 100644 --- a/pkg/ccl/followerreadsccl/followerreads.go +++ b/pkg/ccl/followerreadsccl/followerreads.go @@ -93,7 +93,7 @@ func canUseFollowerRead(clusterID uuid.UUID, st *cluster.Settings, ts hlc.Timest // canSendToFollower implements the logic for checking whether a batch request // may be sent to a follower. func canSendToFollower(clusterID uuid.UUID, st *cluster.Settings, ba roachpb.BatchRequest) bool { - return ba.IsReadOnly() && ba.Txn != nil && + return ba.IsReadOnly() && ba.Txn != nil && !ba.Txn.IsWriting() && canUseFollowerRead(clusterID, st, ba.Txn.OrigTimestamp) } diff --git a/pkg/ccl/followerreadsccl/followerreads_test.go b/pkg/ccl/followerreadsccl/followerreads_test.go index 4e3b6e93303a..44d6bde6c560 100644 --- a/pkg/ccl/followerreadsccl/followerreads_test.go +++ b/pkg/ccl/followerreadsccl/followerreads_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsqlplan/replicaoracle" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -79,6 +80,16 @@ func TestCanSendToFollower(t *testing.T) { if !canSendToFollower(uuid.MakeV4(), st, roOld) { t.Fatalf("should be able to send an old ro batch to a follower") } + roRWTxnOld := roachpb.BatchRequest{Header: roachpb.Header{ + Txn: &roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{Key: []byte("key")}, + OrigTimestamp: old, + }, + }} + roRWTxnOld.Add(&roachpb.GetRequest{}) + if canSendToFollower(uuid.MakeV4(), st, roRWTxnOld) { + t.Fatalf("should not be able to send a ro request from a rw txn to a follower") + } storage.FollowerReadsEnabled.Override(&st.SV, false) if canSendToFollower(uuid.MakeV4(), st, roOld) { t.Fatalf("should not be able to send an old ro batch to a follower when follower reads are disabled") diff --git a/pkg/storage/closed_timestamp_test.go b/pkg/storage/closed_timestamp_test.go index 174d16105152..0b167dd11639 100644 --- a/pkg/storage/closed_timestamp_test.go +++ b/pkg/storage/closed_timestamp_test.go @@ -19,6 +19,7 @@ import ( gosql "database/sql" "fmt" "math/rand" + "sync/atomic" "testing" "time" @@ -265,6 +266,60 @@ func getTableID(db *gosql.DB, dbName, tableName string) (tableID sqlbase.ID, err return } +func TestClosedTimestampCantServeForWritingTransaction(t *testing.T) { + defer leaktest.AfterTest(t)() + + if util.RaceEnabled { + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + t.Skip("skipping under race") + } + + ctx := context.Background() + tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t) + defer tc.Stopper().Stop(ctx) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + + // Verify that we can serve a follower read at a timestamp. Wait if necessary. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1)) + }) + + // Create a read-only batch and attach a read-write transaction. + rwTxn := roachpb.MakeTransaction("test", []byte("key"), roachpb.NormalUserPriority, ts, 0) + baRead := makeReadBatchRequestForDesc(desc, ts) + baRead.Txn = &rwTxn + + // Send the request to all three replicas. One should succeed and + // the other two should return NotLeaseHolderErrors. + g, ctx := errgroup.WithContext(ctx) + var notLeaseholderErrs int64 + for i := range repls { + repl := repls[i] + g.Go(func() (err error) { + if _, pErr := repl.Send(ctx, baRead); pErr != nil { + if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok { + atomic.AddInt64(¬LeaseholderErrs, 1) + return nil + } + return pErr.GetDetail() + } + return nil + }) + } + if err := g.Wait(); err != nil { + t.Fatal(err) + } + if a, e := notLeaseholderErrs, int64(2); a != e { + t.Fatalf("expected %d NotLeaseHolderError; found %d", e, a) + } +} + // Every 0.1s=100ms, try close out a timestamp ~300ms in the past. // We don't want to be more aggressive than that since it's also // a limit on how long transactions can run. @@ -451,17 +506,15 @@ func verifyCanReadFromAllRepls( g, ctx := errgroup.WithContext(ctx) for i := range repls { repl := repls[i] - func(r *storage.Replica) { - g.Go(func() (err error) { - var shouldRetry bool - for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() { - if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry { - return err - } + g.Go(func() (err error) { + var shouldRetry bool + for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() { + if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry { + return err } - return err - }) - }(repls[i]) + } + return err + }) } return g.Wait() } diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index 7ec03d95f927..ab1dfd1f682c 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -43,8 +43,9 @@ func (r *Replica) canServeFollowerRead( ) *roachpb.Error { canServeFollowerRead := false if lErr, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); ok && + lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch && FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) && - lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch { + (ba.Txn == nil || !ba.Txn.IsWriting()) { canServeFollowerRead = !r.maxClosed(ctx).Less(ba.Timestamp) if !canServeFollowerRead {