Skip to content

Commit

Permalink
multiregionccl: add global tables roundtrip test
Browse files Browse the repository at this point in the history
This patch adds TestEnsureLocalReadsOnGlobalTables. which ensures that
global tables serve present time reads locally.

Release note: None
  • Loading branch information
arulajmani committed May 11, 2021
1 parent 3e76920 commit f72206d
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 2 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
"multiregion_test.go",
"region_test.go",
"regional_by_row_test.go",
"roundtrips_test.go",
"show_test.go",
],
deps = [
Expand All @@ -39,6 +40,8 @@ go_test(
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvbase",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
Expand All @@ -58,6 +61,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
)

type multiRegionTestClusterParams struct {
baseDir string
baseDir string
replicationMode base.TestClusterReplicationMode
}

// MultiRegionTestClusterParamsOption is an option that can be passed to
Expand All @@ -36,6 +37,16 @@ func WithBaseDirectory(baseDir string) MultiRegionTestClusterParamsOption {
}
}

// WithReplicationMode is an option to control the replication mode for the
// created multi-region cluster.
func WithReplicationMode(
replicationMode base.TestClusterReplicationMode,
) MultiRegionTestClusterParamsOption {
return func(params *multiRegionTestClusterParams) {
params.replicationMode = replicationMode
}
}

// TestingCreateMultiRegionCluster creates a test cluster with numServers number
// of nodes and the provided testing knobs applied to each of the nodes. Every
// node is placed in its own locality, named "us-east1", "us-east2", and so on.
Expand Down Expand Up @@ -65,6 +76,7 @@ func TestingCreateMultiRegionCluster(
}

tc := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
ReplicationMode: params.replicationMode,
ServerArgsPerNode: serverArgs,
})

Expand Down
138 changes: 138 additions & 0 deletions pkg/ccl/multiregionccl/roundtrips_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package multiregionccl_test

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/stretchr/testify/require"
)

// TestEnsureLocalReadsOnGlobalTables ensures that all present time reads on
// GLOBAL tables don't incur a network hop.
func TestEnsureLocalReadsOnGlobalTables(t *testing.T) {
defer leaktest.AfterTest(t)()

// ensureOnlyLocalReads looks at a trace to ensure that reads were served
// locally. It returns true if the read was served as a follower read.
ensureOnlyLocalReads := func(t *testing.T, rec tracing.Recording) (servedUsingFollowerReads bool) {
for _, sp := range rec {
if sp.Operation == "dist sender send" {
require.True(t, tracing.LogsContainMsg(sp, kvbase.RoutingRequestLocallyMsg),
"query was not served locally: %s", rec)

// Check the child span to find out if the query was served using a
// follower read.
for _, span := range rec {
if span.ParentSpanID == sp.SpanID {
if tracing.LogsContainMsg(span, kvbase.FollowerReadServingMsg) {
servedUsingFollowerReads = true
}
}
}
}
}
return servedUsingFollowerReads
}

presentTimeRead := `SELECT * FROM t.test_table WHERE k=2`
recCh := make(chan tracing.Recording, 1)

knobs := base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
WithStatementTrace: func(trace tracing.Recording, stmt string) {
if stmt == presentTimeRead {
recCh <- trace
}
},
},
}

numServers := 3
tc, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, numServers, knobs, multiregionccltestutils.WithReplicationMode(base.ReplicationManual),
)
defer cleanup()

_, err := sqlDB.Exec(`CREATE DATABASE t PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"`)
require.NoError(t, err)
_, err = sqlDB.Exec(`CREATE TABLE t.test_table (k INT PRIMARY KEY) LOCALITY GLOBAL`)
require.NoError(t, err)

// Set up some write traffic in the background.
errCh := make(chan error)
stopWritesCh := make(chan struct{})
go func() {
i := 0
for {
select {
case <-stopWritesCh:
errCh <- nil
return
case <-time.After(10 * time.Millisecond):
_, err := sqlDB.Exec(`INSERT INTO t.test_table VALUES($1)`, i)
i++
if err != nil {
errCh <- err
return
}
}
}
}()

var tableID uint32
err = sqlDB.QueryRow(`SELECT id from system.namespace WHERE name='test_table'`).Scan(&tableID)
require.NoError(t, err)
tablePrefix := keys.MustAddr(keys.SystemSQLCodec.TablePrefix(tableID))
// Split the range at the start of the table and add a voter to all nodes in
// the cluster.
tc.SplitRangeOrFatal(t, tablePrefix.AsRawKey())
tc.AddVotersOrFatal(t, tablePrefix.AsRawKey(), tc.Target(1), tc.Target(2))

for i := 0; i < numServers; i++ {
// Run a query to populate its cache.
conn := tc.ServerConn(i)
_, err = conn.Exec("SELECT * from t.test_table WHERE k=1")
require.NoError(t, err)

// Check that the cache was indeed populated.
cache := tc.Server(i).DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache()
entry := cache.GetCached(context.Background(), tablePrefix, false /* inverted */)
require.NotNil(t, entry.Lease().Empty())
require.NotNil(t, entry)
require.Equal(t, roachpb.LEAD_FOR_GLOBAL_READS, entry.ClosedTimestampPolicy())
isLeaseHolder := entry.Lease().Replica.NodeID == tc.Server(i).NodeID()

// Run the query to ensure local read.
_, err = conn.Exec(presentTimeRead)
require.NoError(t, err)

rec := <-recCh
followerRead := ensureOnlyLocalReads(t, rec)

// Expect every non-leaseholder to serve a (local) follower read. The
// leaseholder on the other hand won't serve a follower read.
require.Equal(t, !isLeaseHolder, followerRead)
}

close(stopWritesCh)
writeErr := <-errCh
require.NoError(t, writeErr)
}
4 changes: 4 additions & 0 deletions pkg/kv/kvbase/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ package kvbase
// FollowerReadServingMsg is a log message that needs to be used for tests in
// other packages.
const FollowerReadServingMsg = "serving via follower read"

// RoutingRequestLocallyMsg is a log message that needs to be used for tests in
// other packages.
const RoutingRequestLocallyMsg = "sending request to local client"
1 change: 1 addition & 0 deletions pkg/rpc/nodedialer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvbase",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/roachpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"unsafe"

circuit "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (n *Dialer) DialInternalClient(
return nil, nil, err
}
if localClient := n.rpcContext.GetLocalInternalClientForAddr(addr.String(), nodeID); localClient != nil {
log.VEvent(ctx, 2, "sending request to local client")
log.VEvent(ctx, 2, kvbase.RoutingRequestLocallyMsg)

// Create a new context from the existing one with the "local request" field set.
// This tells the handler that this is an in-process request, bypassing ctx.Peer checks.
Expand Down

0 comments on commit f72206d

Please sign in to comment.