diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 4f35771a77ae..87c10e692a08 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -587,6 +587,24 @@ func (ts *TestServer) LookupRange(key roachpb.Key) (roachpb.RangeDescriptor, err return rs[0], nil } +// MergeRanges merges the range containing leftKey with the range to its right. +func (ts *TestServer) MergeRanges(leftKey roachpb.Key) (roachpb.RangeDescriptor, error) { + + ctx := context.Background() + mergeReq := roachpb.AdminMergeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: leftKey, + }, + } + _, pErr := client.SendWrapped(ctx, ts.DB().NonTransactionalSender(), &mergeReq) + if pErr != nil { + return roachpb.RangeDescriptor{}, + errors.Errorf( + "%q: merge unexpected error: %s", leftKey, pErr) + } + return ts.LookupRange(leftKey) +} + // SplitRange splits the range containing splitKey. // The right range created by the split starts at the split key and extends to the // original range's end key. diff --git a/pkg/storage/closed_timestamp_test.go b/pkg/storage/closed_timestamp_test.go index 6497dae66483..174d16105152 100644 --- a/pkg/storage/closed_timestamp_test.go +++ b/pkg/storage/closed_timestamp_test.go @@ -16,21 +16,28 @@ package storage_test import ( "context" + gosql "database/sql" "fmt" + "math/rand" "testing" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestClosedTimestampCanServe(t *testing.T) { @@ -44,17 +51,267 @@ func TestClosedTimestampCanServe(t *testing.T) { } ctx := context.Background() - const numNodes = 3 + tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t) + defer tc.Stopper().Stop(ctx) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1)) + }) + + // We just served a follower read. As a sanity check, make sure that we can't write at + // that same timestamp. + { + var baWrite roachpb.BatchRequest + r := &roachpb.DeleteRequest{} + r.Key = desc.StartKey.AsRawKey() + txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100) + baWrite.Txn = &txn + baWrite.Add(r) + baWrite.RangeID = repls[0].RangeID + if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { + t.Fatal(err) + } + + var found bool + for _, repl := range repls { + resp, pErr := repl.Send(ctx, baWrite) + if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok { + continue + } else if pErr != nil { + t.Fatal(pErr) + } + found = true + if !ts.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp { + t.Fatal("timestamp did not get bumped") + } + break + } + if !found { + t.Fatal("unable to send to any replica") + } + } +} + +// TestClosedTimestampCanServerThroughoutLeaseTransfer verifies that lease +// transfers does not prevent reading a value from a follower that was +// previously readable. +func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + + if util.RaceEnabled { + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + t.Skip("skipping under race") + } + ctx := context.Background() + tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t) + defer tc.Stopper().Stop(ctx) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(1)) + }) + + // Once we know that we can read safely at this timestamp, we want to ensure + // that we can always read from this timestamp from all replicas even while + // lease transfers are ongoing. The test launches a goroutine to randomly + // trigger transfers at random intervals up to 50ms and ensures that there + // are no errors reading the same value from any replica throughout the + // duration of the test (testTime). + const testTime = 500 * time.Millisecond + const maxTransferWait = 50 * time.Millisecond + deadline := timeutil.Now().Add(testTime) + g, gCtx := errgroup.WithContext(ctx) + getCurrentLeaseholder := func() (lh roachpb.ReplicationTarget) { + testutils.SucceedsSoon(t, func() error { + var err error + lh, err = tc.FindRangeLeaseHolder(desc, nil) + return err + }) + return lh + } + pickRandomTarget := func(lh roachpb.ReplicationTarget) (t roachpb.ReplicationTarget) { + for { + if t = tc.Target(rand.Intn(len(repls))); t != lh { + return t + } + } + } + transferLeasesRandomlyUntilDeadline := func() error { + for timeutil.Now().Before(deadline) { + lh := getCurrentLeaseholder() + target := pickRandomTarget(lh) + if err := tc.TransferRangeLease(desc, target); err != nil { + return err + } + time.Sleep(time.Duration(rand.Intn(int(maxTransferWait)))) + } + return nil + } + g.Go(transferLeasesRandomlyUntilDeadline) + + // Attempt to send read requests to a replica in a tight loop until deadline + // is reached. If an error is seen on any replica then it is returned to the + // errgroup. + baRead := makeReadBatchRequestForDesc(desc, ts) + ensureCanReadFromReplicaUntilDeadline := func(r *storage.Replica) { + g.Go(func() error { + for timeutil.Now().Before(deadline) { + resp, pErr := r.Send(gCtx, baRead) + if pErr != nil { + return errors.Wrapf(pErr.GoError(), "on %s", r) + } + rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows + // Should see the write. + if len(rows) != 1 { + return fmt.Errorf("expected one row, but got %d", len(rows)) + } + } + return nil + }) + } + for _, r := range repls { + ensureCanReadFromReplicaUntilDeadline(r) + } + if err := g.Wait(); err != nil { + t.Fatal(err) + } +} + +// TestClosedTimestampCanServeAfterSplitsAndMerges validates the invariant that +// if a timestamp is safe for reading on both the left side and right side of a +// a merge then it will be safe after the merge and that if a timestamp is safe +// for reading before the beginning of a split it will be safe on both sides of +// of the split. +func TestClosedTimestampCanServeAfterSplitAndMerges(t *testing.T) { + defer leaktest.AfterTest(t)() + + if util.RaceEnabled { + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + t.Skip("skipping under race") + } + ctx := context.Background() + tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t) + // Disable the automatic merging. + if _, err := db0.Exec("SET CLUSTER SETTING kv.range_merge.queue_enabled = false"); err != nil { + t.Fatal(err) + } - tc := serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) - db0 := tc.ServerConn(0) - // Every 0.1s=100ms, try close out a timestamp ~300ms in the past. - // We don't want to be more aggressive than that since it's also - // a limit on how long transactions can run. - targetDuration := 300 * time.Millisecond - closeFraction := 0.3 + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(3, $1)`, "foo"); err != nil { + t.Fatal(err) + } + // Start by ensuring that the values can be read from all replicas at ts. + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, expectRows(2)) + }) + // Manually split the table to have easier access to descriptors. + tableID, err := getTableID(db0, "cttest", "kv") + if err != nil { + t.Fatalf("failed to lookup ids: %v", err) + } + // Split the table at key 2. + k, err := sqlbase.EncodeTableKey(sqlbase.EncodeTableIDIndexID(nil, tableID, 1), + tree.NewDInt(2), encoding.Ascending) + if err != nil { + t.Fatalf("failed to encode key: %v", err) + } + lr, rr, err := tc.Server(0).SplitRange(k) + if err != nil { + t.Fatalf("failed to split range at key %v: %v", roachpb.Key(k), err) + } + + // Ensure that we can perform follower reads from all replicas. + lRepls := replsForRange(ctx, t, tc, lr) + rRepls := replsForRange(ctx, t, tc, rr) + // Now immediately query both the ranges and there's 1 value per range. + // We need to tollerate RangeNotFound as the split range may not have been + // created yet. + require.Nil(t, verifyCanReadFromAllRepls(ctx, t, lr, ts, lRepls, + respFuncs(retryOnRangeNotFound, expectRows(1)))) + require.Nil(t, verifyCanReadFromAllRepls(ctx, t, rr, ts, rRepls, + respFuncs(retryOnRangeNotFound, expectRows(1)))) + // Now merge the ranges back together and ensure that there's two values in + // the merged range. + merged, err := tc.Server(0).MergeRanges(lr.StartKey.AsRawKey()) + require.Nil(t, err) + mergedRepls := replsForRange(ctx, t, tc, merged) + // The hazard here is that a follower is not yet aware of the merge and will + // return an error. We'll accept that because a client wouldn't see that error + // from distsender. + require.Nil(t, verifyCanReadFromAllRepls(ctx, t, merged, ts, mergedRepls, + respFuncs(retryOnRangeKeyMismatch, expectRows(2)))) +} + +func getTableID(db *gosql.DB, dbName, tableName string) (tableID sqlbase.ID, err error) { + err = db.QueryRow(`SELECT table_id FROM crdb_internal.tables WHERE database_name = $1 AND name = $2`, + dbName, tableName).Scan(&tableID) + return +} + +// Every 0.1s=100ms, try close out a timestamp ~300ms in the past. +// We don't want to be more aggressive than that since it's also +// a limit on how long transactions can run. +const targetDuration = 300 * time.Millisecond +const closeFraction = 0.333 +const numNodes = 3 + +func replsForRange( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + desc roachpb.RangeDescriptor, +) (repls []*storage.Replica) { + testutils.SucceedsSoon(t, func() error { + repls = nil + for i := 0; i < numNodes; i++ { + repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID) + if err != nil { + return err + } + if repl != nil { + repls = append(repls, repl) + } + } + return nil + }) + return repls +} + +// This function creates a test cluster that is prepared to exercise follower +// reads. The returned test cluster has follower reads enabled using the above +// targetDuration and closeFraction. In addition to the newly minted test +// cluster, this function returns a db handle to node 0, a range descriptor for +// the range used by the table `cttest.kv` and the replica objects corresponding +// to the replicas for the range. It is the caller's responsibility to Stop the +// Stopper on the returned test cluster when done. +func setupTestClusterForClosedTimestampTesting( + ctx context.Context, t *testing.T, +) ( + tc serverutils.TestClusterInterface, + db0 *gosql.DB, + kvTableDesc roachpb.RangeDescriptor, + repls []*storage.Replica, +) { + + tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{}) + db0 = tc.ServerConn(0) if _, err := db0.Exec(fmt.Sprintf(` SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'; @@ -106,24 +363,8 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING); break } } - - var repls []*storage.Replica - testutils.SucceedsSoon(t, func() error { - repls = nil - for i := 0; i < numNodes; i++ { - repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID) - if err != nil { - return err - } - if repl != nil { - repls = append(repls, repl) - } - } - return nil - }) - + repls = replsForRange(ctx, t, tc, desc) require.Equal(t, numReplicas, len(repls)) - // Wait until we see an epoch based lease on our chosen range. This should // happen fairly quickly since we just transferred a lease (as a means to make // it epoch based). If the lease transfer fails, we'll be sitting out the lease @@ -138,74 +379,102 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING); } } } + return tc, db0, desc, repls +} - if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { - t.Fatal(err) - } - - var baRead roachpb.BatchRequest - baRead.Header.RangeID = desc.RangeID - r := &roachpb.ScanRequest{} - r.Key = desc.StartKey.AsRawKey() - r.EndKey = desc.EndKey.AsRawKey() - baRead.Add(r) - baRead.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} +type respFunc func(*roachpb.BatchResponse, *roachpb.Error) (err error, shouldRetry bool) - // The read should succeed once enough time (~300ms, but it's difficult to - // assert on that) has passed - on all replicas! - testutils.SucceedsSoon(t, func() error { - for _, repl := range repls { - resp, pErr := repl.Send(ctx, baRead) - if pErr != nil { - switch tErr := pErr.GetDetail().(type) { - case *roachpb.NotLeaseHolderError: - return tErr - case *roachpb.RangeNotFoundError: - // Can happen during upreplication. - return tErr - default: - t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl)) - } - } - rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows - // Should see the write. - if len(rows) != 1 { - t.Fatalf("expected one row, but got %d", len(rows)) +// respFuncs returns a respFunc which is passes its arguments to each passed +// func until one returns shouldRetry or a non-nil error. +func respFuncs(funcs ...respFunc) respFunc { + return func(resp *roachpb.BatchResponse, pErr *roachpb.Error) (err error, shouldRetry bool) { + for _, f := range funcs { + err, shouldRetry = f(resp, pErr) + if err != nil || shouldRetry { + break } } - return nil - }) + return err, shouldRetry + } +} - // We just served a follower read. As a sanity check, make sure that we can't write at - // that same timestamp. - { - var baWrite roachpb.BatchRequest - r := &roachpb.DeleteRequest{} - r.Key = desc.StartKey.AsRawKey() - txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, baRead.Timestamp, 100) - baWrite.Txn = &txn - baWrite.Add(r) - baWrite.RangeID = repls[0].RangeID - if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { - t.Fatal(err) +func retryOnError(f func(*roachpb.Error) bool) respFunc { + return func(resp *roachpb.BatchResponse, pErr *roachpb.Error) (err error, shouldRetry bool) { + if pErr != nil && f(pErr) { + return nil, true } + return pErr.GoError(), false + } +} - var found bool - for _, repl := range repls { - resp, pErr := repl.Send(ctx, baWrite) - if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok { - continue - } else if pErr != nil { - t.Fatal(pErr) - } - found = true - if !baRead.Timestamp.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp { - t.Fatal("timestamp did not get bumped") - } - break +var retryOnRangeKeyMismatch = retryOnError(func(pErr *roachpb.Error) bool { + _, isRangeKeyMismatch := pErr.Detail.Value.(*roachpb.ErrorDetail_RangeKeyMismatch) + return isRangeKeyMismatch +}) + +var retryOnRangeNotFound = retryOnError(func(pErr *roachpb.Error) bool { + _, isRangeNotFound := pErr.Detail.Value.(*roachpb.ErrorDetail_RangeNotFound) + return isRangeNotFound +}) + +func expectRows(expectedRows int) respFunc { + return func(resp *roachpb.BatchResponse, pErr *roachpb.Error) (err error, shouldRetry bool) { + if pErr != nil { + return pErr.GoError(), false } - if !found { - t.Fatal("unable to send to any replica") + rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows + // Should see the write. + if len(rows) != expectedRows { + return fmt.Errorf("expected %d rows, but got %d", expectedRows, len(rows)), false } + return nil, false + } +} + +func verifyCanReadFromAllRepls( + ctx context.Context, + t *testing.T, + desc roachpb.RangeDescriptor, + ts hlc.Timestamp, + repls []*storage.Replica, + f respFunc, +) error { + t.Helper() + retryOptions := retry.Options{ + InitialBackoff: 500 * time.Microsecond, + MaxBackoff: 5 * time.Millisecond, + MaxRetries: 100, } + baRead := makeReadBatchRequestForDesc(desc, ts) + // The read should succeed once enough time (~300ms, but it's difficult to + // assert on that) has passed - on all replicas! + g, ctx := errgroup.WithContext(ctx) + for i := range repls { + repl := repls[i] + func(r *storage.Replica) { + g.Go(func() (err error) { + var shouldRetry bool + for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); <-r.NextCh() { + if err, shouldRetry = f(repl.Send(ctx, baRead)); !shouldRetry { + return err + } + } + return err + }) + }(repls[i]) + } + return g.Wait() +} + +func makeReadBatchRequestForDesc( + desc roachpb.RangeDescriptor, ts hlc.Timestamp, +) roachpb.BatchRequest { + var baRead roachpb.BatchRequest + baRead.Header.RangeID = desc.RangeID + r := &roachpb.ScanRequest{} + r.Key = desc.StartKey.AsRawKey() + r.EndKey = desc.EndKey.AsRawKey() + baRead.Add(r) + baRead.Timestamp = ts + return baRead } diff --git a/pkg/storage/closedts/closedts.go b/pkg/storage/closedts/closedts.go index 5ff655e43989..073ce7af1d56 100644 --- a/pkg/storage/closedts/closedts.go +++ b/pkg/storage/closedts/closedts.go @@ -137,7 +137,6 @@ type Provider interface { Producer Notifyee Start() - CanServe(roachpb.NodeID, hlc.Timestamp, roachpb.RangeID, ctpb.Epoch, ctpb.LAI) bool MaxClosed(roachpb.NodeID, roachpb.RangeID, ctpb.Epoch, ctpb.LAI) hlc.Timestamp } diff --git a/pkg/storage/closedts/container/container_test.go b/pkg/storage/closedts/container/container_test.go index e8eb837a869b..f2c19928ae3d 100644 --- a/pkg/storage/closedts/container/container_test.go +++ b/pkg/storage/closedts/container/container_test.go @@ -143,11 +143,11 @@ func TestTwoNodes(t *testing.T) { }() // Initially, can't serve random things for either n1 or n2. - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{}, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)), + require.True(t, c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)).IsEmpty(), ) - require.False(t, c1.Container.Provider.CanServe( - c2.NodeID, hlc.Timestamp{}, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)), + require.True(t, c1.Container.Provider.MaxClosed( + c2.NodeID, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)).IsEmpty(), ) // Track and release a command. @@ -173,8 +173,8 @@ func TestTwoNodes(t *testing.T) { // 0.1 - this is because it has no information about any ranges at that timestamp. // (Note that the Tracker may not have processed the closing yet, so if there were // a bug here, this test would fail flakily - that's ok). - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{Logical: 1}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12)), + require.True(t, c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12)).IsEmpty(), ) // Two more commands come in. @@ -191,23 +191,23 @@ func TestTwoNodes(t *testing.T) { c1.TestClock.Tick(hlc.Timestamp{WallTime: 3E9}, ctpb.Epoch(1), nil) testutils.SucceedsSoon(t, func() error { - if !c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), - ) { + if c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), + ).Less(hlc.Timestamp{WallTime: 1E9}) { return errors.New("still can't serve") } return nil }) // Shouldn't be able to serve the same thing if we haven't caught up yet. - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(11), - )) + require.False(t, !c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(11), + ).Less(hlc.Timestamp{WallTime: 1E9})) // Shouldn't be able to serve at a higher timestamp. - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9, Logical: 1}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), - )) + require.False(t, !c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), + ).Less(hlc.Timestamp{WallTime: 1E9, Logical: 1})) // Now things get a little more interesting. Tell node2 to get a stream of // information from node1. We do this via Request, which as a side effect lets @@ -226,9 +226,9 @@ func TestTwoNodes(t *testing.T) { // And n2 should soon also be able to serve follower reads for a range lead by // n1 when it has caught up. testutils.SucceedsSoon(t, func() error { - if !c2.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), - ) { + if c2.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), + ).Less(hlc.Timestamp{WallTime: 1E9}) { return errors.New("n2 still can't serve") } return nil @@ -249,28 +249,28 @@ func TestTwoNodes(t *testing.T) { {8, 88}, } { testutils.SucceedsSoon(t, func() error { - if !c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(1), tuple.LAI, - ) { + if c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(1), tuple.LAI, + ).Less(ts) { return errors.Errorf("n%d still can't serve (r%d,%d) @ %s", i+1, tuple.RangeID, tuple.LAI, ts) } return nil }) // Still can't serve when not caught up. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(1), tuple.LAI-1, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(1), tuple.LAI-1, + ).Less(ts)) // Can serve when more than caught up. - require.True(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(1), tuple.LAI+1, - )) + require.True(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(1), tuple.LAI+1, + ).Less(ts)) // Can't serve when in different epoch, no matter larger or smaller. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(0), tuple.LAI, - )) - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(2), tuple.LAI, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(0), tuple.LAI, + ).Less(ts)) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(2), tuple.LAI, + ).Less(ts)) } } } @@ -322,30 +322,30 @@ func TestTwoNodes(t *testing.T) { ts := hlc.Timestamp{WallTime: int64(container.StorageBucketScale) + 5E9} testutils.SucceedsSoon(t, func() error { - if !c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch, lai, - ) { + if c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch, lai, + ).Less(ts) { return errors.Errorf("n%d still can't serve (r%d,%d) @ %s", i+1, rangeID, lai, ts) } return nil }) // Still can't serve when not caught up. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch, lai-1, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch, lai-1, + ).Less(ts)) // Can serve when more than caught up. - require.True(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch, lai+1, - )) + require.True(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch, lai+1, + ).Less(ts)) // Can't serve when in different epoch, no matter larger or smaller. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch-1, lai, - )) - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch+1, lai, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch-1, lai, + ).Less(ts)) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch+1, lai, + ).Less(ts)) } } diff --git a/pkg/storage/closedts/container/noop.go b/pkg/storage/closedts/container/noop.go index 49ebb81c4b3b..71fec28dd020 100644 --- a/pkg/storage/closedts/container/noop.go +++ b/pkg/storage/closedts/container/noop.go @@ -73,11 +73,6 @@ func (noopEverything) Notify(roachpb.NodeID) chan<- ctpb.Entry { } func (noopEverything) Subscribe(context.Context, chan<- ctpb.Entry) {} func (noopEverything) Start() {} -func (noopEverything) CanServe( - roachpb.NodeID, hlc.Timestamp, roachpb.RangeID, ctpb.Epoch, ctpb.LAI, -) bool { - return false -} func (noopEverything) MaxClosed( roachpb.NodeID, roachpb.RangeID, ctpb.Epoch, ctpb.LAI, ) hlc.Timestamp { diff --git a/pkg/storage/closedts/provider/provider.go b/pkg/storage/closedts/provider/provider.go index 4f4ebb4ee677..ea08ef523378 100644 --- a/pkg/storage/closedts/provider/provider.go +++ b/pkg/storage/closedts/provider/provider.go @@ -330,35 +330,12 @@ func (p *Provider) Subscribe(ctx context.Context, ch chan<- ctpb.Entry) { } } -// CanServe implements closedts.Provider. -func (p *Provider) CanServe( - nodeID roachpb.NodeID, ts hlc.Timestamp, rangeID roachpb.RangeID, epoch ctpb.Epoch, lai ctpb.LAI, -) bool { - var ok bool - p.cfg.Storage.VisitDescending(nodeID, func(entry ctpb.Entry) bool { - mlai, found := entry.MLAI[rangeID] - ctOK := !entry.ClosedTimestamp.Less(ts) - - ok = found && - ctOK && - entry.Epoch == epoch && - mlai <= lai - - // We're done either if we proved that the read is possible, or if we're - // already done looking at closed timestamps large enough to satisfy it. - done := ok || !ctOK - return done - }) - - return ok -} - // MaxClosed implements closedts.Provider. func (p *Provider) MaxClosed( nodeID roachpb.NodeID, rangeID roachpb.RangeID, epoch ctpb.Epoch, lai ctpb.LAI, ) hlc.Timestamp { var maxTS hlc.Timestamp - p.cfg.Storage.VisitDescending(nodeID, func(entry ctpb.Entry) bool { + p.cfg.Storage.VisitDescending(nodeID, func(entry ctpb.Entry) (done bool) { if mlai, found := entry.MLAI[rangeID]; found { if entry.Epoch == epoch && mlai <= lai { maxTS = entry.ClosedTimestamp diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index acb212dc2359..0c0f3b947899 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -313,6 +313,9 @@ type Replica struct { // we know that the replica has caught up. lastReplicaAdded roachpb.ReplicaID lastReplicaAddedTime time.Time + // initialMaxClosed is the initial maxClosed timestamp for the replica as known + // from its left-hand-side upon creation. + initialMaxClosed hlc.Timestamp // The most recently updated time for each follower of this range. This is updated // every time a Raft message is received from a peer. diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index 64501e5a9cfe..59bdcb747741 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" ctstorage "github.com/cockroachdb/cockroach/pkg/storage/closedts/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -45,24 +46,18 @@ func (r *Replica) canServeFollowerRead( FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch { - r.mu.RLock() - lai := r.mu.state.LeaseAppliedIndex - r.mu.RUnlock() - canServeFollowerRead = r.store.cfg.ClosedTimestamp.Provider.CanServe( - lErr.LeaseHolder.NodeID, ba.Timestamp, r.RangeID, ctpb.Epoch(lErr.Lease.Epoch), ctpb.LAI(lai), - ) - + canServeFollowerRead = !r.maxClosed(ctx).Less(ba.Timestamp) if !canServeFollowerRead { - // We can't actually serve the read. Signal the clients that we want - // an update so that future requests can succeed. + // We can't actually serve the read based on the closed timestamp. + // Signal the clients that we want an update so that future requests can succeed. r.store.cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID) if false { // NB: this can't go behind V(x) because the log message created by the // storage might be gigantic in real clusters, and we don't want to trip it // using logspy. - log.Warningf(ctx, "can't serve follower read for %s at epo %d lai %d, storage is %s", - ba.Timestamp, lErr.Lease.Epoch, lai, + log.Warningf(ctx, "can't serve follower read for %s at epo %d, storage is %s", + ba.Timestamp, lErr.Lease.Epoch, r.store.cfg.ClosedTimestamp.Storage.(*ctstorage.MultiStorage).StringForNodes(lErr.LeaseHolder.NodeID), ) } @@ -81,3 +76,23 @@ func (r *Replica) canServeFollowerRead( log.Event(ctx, "serving via follower read") return nil } + +// maxClosed returns the maximum closed timestamp for this range. +// It is computed as the most recent of the known closed timestamp for the +// current lease holder for this range as tracked by the closed timestamp +// subsystem and the start time of the current lease. It is safe to use the +// start time of the current lease because leasePostApply bumps the timestamp +// cache forward to at least the new lease start time. Using this combination +// allows the closed timestamp mechanism to be robust to lease transfers. +func (r *Replica) maxClosed(ctx context.Context) hlc.Timestamp { + r.mu.RLock() + lai := r.mu.state.LeaseAppliedIndex + lease := *r.mu.state.Lease + initialMaxClosed := r.mu.initialMaxClosed + r.mu.RUnlock() + maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( + lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai)) + maxClosed.Forward(lease.Start) + maxClosed.Forward(initialMaxClosed) + return maxClosed +} diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index f961920b958b..b425822f4459 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" @@ -414,15 +413,8 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked() { return } - r.mu.RLock() - lai := r.mu.state.LeaseAppliedIndex - lease := *r.mu.state.Lease - r.mu.RUnlock() - // Determine what the maximum closed timestamp is for this replica. - closedTS := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( - lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai), - ) + closedTS := r.maxClosed(context.Background()) // If the closed timestamp is not empty, inform the Processor. if closedTS.IsEmpty() { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3e8db1454dd0..f26283dfcb3e 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2069,10 +2069,24 @@ func splitPostApply( } // Finish initialization of the RHS. + + // This initialMaxClosedValue is created here to ensure that follower reads + // do not regress following the split. After the split occurs there will be no + // information in the closedts subsystem about the newly minted RHS range from + // its leaseholder's store. Furthermore, the RHS will have a lease start time + // equal to that of the LHS which might be quite old. This means that + // timestamps which follow the least StartTime for the LHS part are below the + // current closed timestamp for the LHS would no longer be readable on the RHS + // after the split. It is critical that this call to maxClosed happen during + // the splitPostApply so that it refers to a LAI that is equal to the index at + // which this lease was applied. If it were to refer to a LAI after the split + // then the value of initialMaxClosed might be unsafe. + initialMaxClosed := r.maxClosed(ctx) r.mu.Lock() rightRng.mu.Lock() // Copy the minLeaseProposedTS from the LHS. rightRng.mu.minLeaseProposedTS = r.mu.minLeaseProposedTS + rightRng.mu.initialMaxClosed = initialMaxClosed rightLease := *rightRng.mu.state.Lease rightRng.mu.Unlock() r.mu.Unlock() diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 7d51f4489bb9..faf268d0de38 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -143,6 +143,10 @@ type TestServerInterface interface { splitKey roachpb.Key, ) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error) + // MergeRanges merges the range containing leftKey with the following adjacent + // range. + MergeRanges(leftKey roachpb.Key) (merged roachpb.RangeDescriptor, err error) + // ExpectedInitialRangeCount returns the expected number of ranges that should // be on the server after initial (asynchronous) splits have been completed, // assuming no additional information is added outside of the normal bootstrap