diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index 2fe103f93115..4fb5df42e3b1 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -27,6 +27,7 @@ go_test( "multiregion_test.go", "region_test.go", "regional_by_row_test.go", + "roundtrips_test.go", "show_test.go", ], deps = [ @@ -39,6 +40,8 @@ go_test( "//pkg/jobs", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvbase", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", @@ -58,6 +61,7 @@ go_test( "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/syncutil", + "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go b/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go index 329eb043614f..70840e16f393 100644 --- a/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go +++ b/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go @@ -21,7 +21,8 @@ import ( ) type multiRegionTestClusterParams struct { - baseDir string + baseDir string + replicationMode base.TestClusterReplicationMode } // MultiRegionTestClusterParamsOption is an option that can be passed to @@ -36,6 +37,16 @@ func WithBaseDirectory(baseDir string) MultiRegionTestClusterParamsOption { } } +// WithReplicationMode is an option to control the replication mode for the +// created multi-region cluster. +func WithReplicationMode( + replicationMode base.TestClusterReplicationMode, +) MultiRegionTestClusterParamsOption { + return func(params *multiRegionTestClusterParams) { + params.replicationMode = replicationMode + } +} + // TestingCreateMultiRegionCluster creates a test cluster with numServers number // of nodes and the provided testing knobs applied to each of the nodes. Every // node is placed in its own locality, named "us-east1", "us-east2", and so on. @@ -65,6 +76,7 @@ func TestingCreateMultiRegionCluster( } tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{ + ReplicationMode: params.replicationMode, ServerArgsPerNode: serverArgs, }) diff --git a/pkg/ccl/multiregionccl/roundtrips_test.go b/pkg/ccl/multiregionccl/roundtrips_test.go new file mode 100644 index 000000000000..19d0b0522190 --- /dev/null +++ b/pkg/ccl/multiregionccl/roundtrips_test.go @@ -0,0 +1,138 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package multiregionccl_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/stretchr/testify/require" +) + +// TestEnsureLocalReadsOnGlobalTables ensures that all present time reads on +// GLOBAL tables don't incur a network hop. +func TestEnsureLocalReadsOnGlobalTables(t *testing.T) { + defer leaktest.AfterTest(t)() + + // ensureOnlyLocalReads looks at a trace to ensure that reads were served + // locally. It returns true if the read was served as a follower read. + ensureOnlyLocalReads := func(t *testing.T, rec tracing.Recording) (servedUsingFollowerReads bool) { + for _, sp := range rec { + if sp.Operation == "dist sender send" { + require.True(t, tracing.LogsContainMsg(sp, kvbase.RoutingRequestLocallyMsg), + "query was not served locally: %s", rec) + + // Check the child span to find out if the query was served using a + // follower read. + for _, span := range rec { + if span.ParentSpanID == sp.SpanID { + if tracing.LogsContainMsg(span, kvbase.FollowerReadServingMsg) { + servedUsingFollowerReads = true + } + } + } + } + } + return servedUsingFollowerReads + } + + presentTimeRead := `SELECT * FROM t.test_table WHERE k=2` + recCh := make(chan tracing.Recording, 1) + + knobs := base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + WithStatementTrace: func(trace tracing.Recording, stmt string) { + if stmt == presentTimeRead { + recCh <- trace + } + }, + }, + } + + numServers := 3 + tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, numServers, knobs, multiregionccltestutils.WithReplicationMode(base.ReplicationManual), + ) + defer cleanup() + + _, err := sqlDB.Exec(`CREATE DATABASE t PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"`) + require.NoError(t, err) + _, err = sqlDB.Exec(`CREATE TABLE t.test_table (k INT PRIMARY KEY) LOCALITY GLOBAL`) + require.NoError(t, err) + + // Set up some write traffic in the background. + errCh := make(chan error) + stopWritesCh := make(chan struct{}) + go func() { + i := 0 + for { + select { + case <-stopWritesCh: + errCh <- nil + return + case <-time.After(10 * time.Millisecond): + _, err := sqlDB.Exec(`INSERT INTO t.test_table VALUES($1)`, i) + i++ + if err != nil { + errCh <- err + return + } + } + } + }() + + var tableID uint32 + err = sqlDB.QueryRow(`SELECT id from system.namespace WHERE name='test_table'`).Scan(&tableID) + require.NoError(t, err) + tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID)) + // Split the range at the start of the table and add a voter to all nodes in + // the cluster. + tc.SplitRangeOrFatal(t, tablePrefix.AsRawKey()) + tc.AddVotersOrFatal(t, tablePrefix.AsRawKey(), tc.Target(1), tc.Target(2)) + + for i := 0; i < numServers; i++ { + // Run a query to populate its cache. + conn := tc.ServerConn(i) + _, err = conn.Exec("SELECT * from t.test_table WHERE k=1") + require.NoError(t, err) + + // Check that the cache was indeed populated. + cache := tc.Server(i).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache() + entry := cache.GetCached(context.Background(), tablePrefix, false /* inverted */) + require.NotNil(t, entry.Lease().Empty()) + require.NotNil(t, entry) + require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy()) + isLeaseHolder := entry.Lease().Replica.NodeID == tc.Server(i).NodeID() + + // Run the query to ensure local read. + _, err = conn.Exec(presentTimeRead) + require.NoError(t, err) + + rec := <-recCh + followerRead := ensureOnlyLocalReads(t, rec) + + // Expect every non-leaseholder to serve a (local) follower read. The + // leaseholder on the other hand won't serve a follower read. + require.Equal(t, !isLeaseHolder, followerRead) + } + + close(stopWritesCh) + writeErr := <-errCh + require.NoError(t, writeErr) +} diff --git a/pkg/kv/kvbase/constants.go b/pkg/kv/kvbase/constants.go index 7d0fdfc714bb..e0a185c32f2a 100644 --- a/pkg/kv/kvbase/constants.go +++ b/pkg/kv/kvbase/constants.go @@ -13,3 +13,7 @@ package kvbase // FollowerReadServingMsg is a log message that needs to be used for tests in // other packages. const FollowerReadServingMsg = "serving via follower read" + +// RoutingRequestLocallyMsg is a log message that needs to be used for tests in +// other packages. +const RoutingRequestLocallyMsg = "sending request to local client" diff --git a/pkg/rpc/nodedialer/BUILD.bazel b/pkg/rpc/nodedialer/BUILD.bazel index b4cf69f441be..2705eb0535df 100644 --- a/pkg/rpc/nodedialer/BUILD.bazel +++ b/pkg/rpc/nodedialer/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvbase", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/roachpb", diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index b395df72eafd..1d42d308d557 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -18,6 +18,7 @@ import ( "unsafe" circuit "github.com/cockroachdb/circuitbreaker" + "github.com/cockroachdb/cockroach/pkg/kv/kvbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -121,7 +122,7 @@ func (n *Dialer) DialInternalClient( return nil, nil, err } if localClient := n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID); localClient != nil { - log.VEvent(ctx, 2, "sending request to local client") + log.VEvent(ctx, 2, kvbase.RoutingRequestLocallyMsg) // Create a new context from the existing one with the "local request" field set. // This tells the handler that this is an in-process request, bypassing ctx.Peer checks.