Skip to content

Commit

Permalink
Merge pull request #106776 from j82w/backport23.1-106587
Browse files Browse the repository at this point in the history
release-23.1: sql: fix StatementStatistics.Nodes list
  • Loading branch information
j82w authored Jul 14, 2023
2 parents c93ce66 + 9499079 commit 3fc2c00
Show file tree
Hide file tree
Showing 18 changed files with 229 additions and 113 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/testccl/sqlstatsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/appstatspb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
174 changes: 134 additions & 40 deletions pkg/ccl/testccl/sqlstatsccl/sql_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@ import (
gosql "database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/appstatspb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSQLStatsRegions(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "test is to slow for race")
skip.UnderStress(t, "test is too heavy to run under stress")

// We build a small multiregion cluster, with the proper settings for
Expand All @@ -44,30 +51,78 @@ func TestSQLStatsRegions(t *testing.T) {
sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(ctx, &st.SV, true)
sql.SecondaryTenantZoneConfigsEnabled.Override(ctx, &st.SV, true)

numServers := 9
numServers := 3
regionNames := []string{
"gcp-us-west1",
"gcp-us-central1",
"gcp-us-east1",
}

serverArgs := make(map[int]base.TestServerArgs)
signalAfter := make([]chan struct{}, numServers)
for i := 0; i < numServers; i++ {
serverArgs[i] = base.TestServerArgs{
signalAfter[i] = make(chan struct{})
args := base.TestServerArgs{
Settings: st,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i%len(regionNames)]}},
},
// We'll start our own test tenant manually below.
DisableDefaultTestTenant: true,
}

serverKnobs := &server.TestingKnobs{
SignalAfterGettingRPCAddress: signalAfter[i],
}

args.Knobs.Server = serverKnobs
serverArgs[i] = args
}

host := testcluster.StartTestCluster(t, numServers, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
ParallelStart: true,
})
defer host.Stopper().Stop(ctx)

go func() {
for _, c := range signalAfter {
<-c
}
}()

tdb := sqlutils.MakeSQLRunner(host.ServerConn(1))

// Shorten the closed timestamp target duration so that span configs
// propagate more rapidly.
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '200ms'`)
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.load_based_rebalancing = off")
tdb.Exec(t, "SET CLUSTER SETTING kv.allocator.min_lease_transfer_interval = '10ms'")
// Lengthen the lead time for the global tables to prevent overload from
// resulting in delays in propagating closed timestamps and, ultimately
// forcing requests from being redirected to the leaseholder. Without this
// change, the test sometimes is flakey because the latency budget allocated
// to closed timestamp propagation proves to be insufficient. This value is
// very cautious, and makes this already slow test even slower.
tdb.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '50 ms'")
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '1500ms'`)
tdb.Exec(t, `ALTER TENANT ALL SET CLUSTER SETTING spanconfig.reconciliation_job.checkpoint_interval = '500ms'`)

tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true")
tdb.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.multi_region.allow_abstractions_for_secondary_tenants.enabled = true")
tdb.Exec(t, `ALTER RANGE meta configure zone using constraints = '{"+region=gcp-us-west1": 1, "+region=gcp-us-central1": 1, "+region=gcp-us-east1": 1}';`)

// Create secondary tenants
var tenantDbs []*gosql.DB
for _, server := range host.Servers {
_, tenantDb := serverutils.StartTenant(t, server, base.TestTenantArgs{
Settings: st,
TenantID: roachpb.MustMakeTenantID(11),
Locality: *server.Locality(),
})
tenantDbs = append(tenantDbs, tenantDb)
}

testCases := []struct {
name string
db func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner
Expand All @@ -84,24 +139,16 @@ func TestSQLStatsRegions(t *testing.T) {
// connection to the first one.
name: "secondary tenant",
db: func(t *testing.T, host *testcluster.TestCluster, st *cluster.Settings) *sqlutils.SQLRunner {
var dbs []*gosql.DB
for _, server := range host.Servers {
_, db := serverutils.StartTenant(t, server, base.TestTenantArgs{
Settings: st,
TenantID: roachpb.MustMakeTenantID(11),
Locality: *server.Locality(),
})
dbs = append(dbs, db)
}
return sqlutils.MakeSQLRunner(dbs[0])
return sqlutils.MakeSQLRunner(tenantDbs[1])
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
db := tc.db(t, host, st)

// Create a multi-region database.
db.Exec(t, "SET enable_multiregion_placement_policy = true")
db.Exec(t, `SET CLUSTER SETTING sql.txn_stats.sample_rate = 1;`)

db.Exec(t, fmt.Sprintf(`CREATE DATABASE testdb PRIMARY REGION "%s" PLACEMENT RESTRICTED`, regionNames[0]))
for i := 1; i < len(regionNames); i++ {
db.Exec(t, fmt.Sprintf(`ALTER DATABASE testdb ADD region "%s"`, regionNames[i]))
Expand All @@ -113,39 +160,86 @@ func TestSQLStatsRegions(t *testing.T) {

// Add some data to each region.
for i, regionName := range regionNames {
db.Exec(t, "INSERT INTO test (crdb_region, a) VALUES ($1, $2)", regionName, i)
db.Exec(t, "INSERT INTO test (a, crdb_region) VALUES ($1, $2)", i, regionName)
}

// Select from the table and see what statement statistics were written.
db.Exec(t, "SET application_name = $1", t.Name())
db.Exec(t, "SELECT * FROM test")
row := db.QueryRow(t, `
// It takes a while for the region replication to complete.
testutils.SucceedsWithin(t, func() error {
var expectedNodes []int64
var expectedRegions []string
_, err := db.DB.ExecContext(ctx, `USE testdb`)
if err != nil {
return err
}

// Use EXPLAIN ANALYSE (DISTSQL) to get the accurate list of nodes.
explainInfo, err := db.DB.QueryContext(ctx, `EXPLAIN ANALYSE (DISTSQL) SELECT * FROM test`)
if err != nil {
return err
}
for explainInfo.Next() {
var explainStr string
if err := explainInfo.Scan(&explainStr); err != nil {
t.Fatal(err)
}

explainStr = strings.ReplaceAll(explainStr, " ", "")
// Example str " regions: cp-us-central1,gcp-us-east1,gcp-us-west1"
if strings.HasPrefix(explainStr, "regions:") {
explainStr = strings.ReplaceAll(explainStr, "regions:", "")
explainStr = strings.ReplaceAll(explainStr, " ", "")
expectedRegions = strings.Split(explainStr, ",")
if len(expectedRegions) < len(regionNames) {
return fmt.Errorf("rows are not replicated to all regions %s\n", expectedRegions)
}
}

// Example str " nodes: n1, n2, n4, n9"
if strings.HasPrefix(explainStr, "nodes:") {
explainStr = strings.ReplaceAll(explainStr, "nodes:", "")
explainStr = strings.ReplaceAll(explainStr, "n", "")

split := strings.Split(explainStr, ",")
if len(split) < len(regionNames) {
return fmt.Errorf("rows are not replicated to all regions %s\n", split)
}

// Gateway node was not included in the explain plan. Add it to the list
if split[0] != "1" {
expectedNodes = append(expectedNodes, int64(1))
}

for _, val := range split {
node, err := strconv.Atoi(val)
require.NoError(t, err)
expectedNodes = append(expectedNodes, int64(node))
}
}
}

// Select from the table and see what statement statistics were written.
db.Exec(t, "SET application_name = $1", t.Name())
db.Exec(t, "SELECT * FROM test")
row := db.QueryRow(t, `
SELECT statistics->>'statistics'
FROM crdb_internal.statement_statistics
WHERE app_name = $1`, t.Name())

var actualJSON string
row.Scan(&actualJSON)
var actual appstatspb.StatementStatistics
err := json.Unmarshal([]byte(actualJSON), &actual)
require.NoError(t, err)

require.Equal(t,
appstatspb.StatementStatistics{
// TODO(todd): It appears we do not yet reliably record
// the nodes for the statement. (I have manually verified
// that the above query does indeed fan out across the
// regions, via EXPLAIN (DISTSQL).) Filed as #96647.
//Nodes: []int64{1, 2, 3},
//Regions: regionNames,
Nodes: []int64{1},
Regions: []string{regionNames[0]},
},
appstatspb.StatementStatistics{
Nodes: actual.Nodes,
Regions: actual.Regions,
},
)
var actualJSON string
row.Scan(&actualJSON)
var actual appstatspb.StatementStatistics
err = json.Unmarshal([]byte(actualJSON), &actual)
require.NoError(t, err)

// Replication to all regions can take some time to complete. During
// this time a incomplete list will be returned.
if !assert.ObjectsAreEqual(expectedNodes, actual.Nodes) {
return fmt.Errorf("nodes are not equal. Expected: %d, Actual: %d", expectedNodes, actual.Nodes)
}

require.Equal(t, expectedRegions, actual.Regions)
return nil
}, 3*time.Minute)
})
}
}
9 changes: 3 additions & 6 deletions pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coldataext"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -475,9 +474,7 @@ func (s *vectorizedFlowCreator) wrapWithNetworkVectorizedStatsCollector(
// statistics that the outbox is responsible for, nil is returned if stats are
// not being collected.
func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox(
flowCtx *execinfra.FlowCtx,
statsCollectors []colexecop.VectorizedStatsCollector,
originSQLInstanceID base.SQLInstanceID,
flowCtx *execinfra.FlowCtx, statsCollectors []colexecop.VectorizedStatsCollector,
) func(context.Context) []*execinfrapb.ComponentStats {
if !s.recordingStats {
return nil
Expand All @@ -497,7 +494,7 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox(
// whole flow from parent monitors. These stats are added to a
// flow-level span.
result = append(result, &execinfrapb.ComponentStats{
Component: execinfrapb.FlowComponentID(originSQLInstanceID, flowCtx.ID),
Component: flowCtx.FlowComponentID(),
FlowStats: execinfrapb.FlowStats{
MaxMemUsage: optional.MakeUint(uint64(flowCtx.Mon.MaximumBytes())),
MaxDiskUsage: optional.MakeUint(uint64(flowCtx.DiskMonitor.MaximumBytes())),
Expand Down Expand Up @@ -1100,7 +1097,7 @@ func (s *vectorizedFlowCreator) setupOutput(
// Set up an Outbox.
outbox, err := s.setupRemoteOutputStream(
ctx, flowCtx, pspec.ProcessorID, opWithMetaInfo, opOutputTypes, outputStream, factory,
s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors, outputStream.OriginNodeID),
s.makeGetStatsFnForOutbox(flowCtx, opWithMetaInfo.StatsCollectors),
)
if err != nil {
return err
Expand Down
13 changes: 6 additions & 7 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1702,15 +1702,15 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ppInfo.dispatchToExecutionEngine.cleanup.appendFunc(namedFunc{
fName: "populate query level stats and regions",
f: func() {
populateQueryLevelStatsAndRegions(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
populateQueryLevelStats(ctx, &curPlanner, ex.server.cfg, ppInfo.dispatchToExecutionEngine.queryStats, &ex.cpuStatsCollector)
ppInfo.dispatchToExecutionEngine.stmtFingerprintID = ex.recordStatementSummary(
ctx, &curPlanner,
int(ex.state.mu.autoRetryCounter), ppInfo.dispatchToExecutionEngine.rowsAffected, ppInfo.curRes.ErrAllowReleased(), *ppInfo.dispatchToExecutionEngine.queryStats,
)
},
})
} else {
populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
populateQueryLevelStats(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)
stmtFingerprintID = ex.recordStatementSummary(
ctx, planner,
int(ex.state.mu.autoRetryCounter), res.RowsAffected(), res.Err(), stats,
Expand Down Expand Up @@ -1758,12 +1758,11 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

// populateQueryLevelStatsAndRegions collects query-level execution statistics
// populateQueryLevelStats collects query-level execution statistics
// and populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// Query-level execution statistics are collected using the statement's trace
// and the plan's flow metadata. It also populates the regions field and
// annotates the explainPlan field of the instrumentationHelper.
func populateQueryLevelStatsAndRegions(
// and the plan's flow metadata.
func populateQueryLevelStats(
ctx context.Context,
p *planner,
cfg *ExecutorConfig,
Expand Down Expand Up @@ -1805,7 +1804,7 @@ func populateQueryLevelStatsAndRegions(
}
}
if ih.traceMetadata != nil && ih.explainPlan != nil {
ih.regions = ih.traceMetadata.annotateExplain(
ih.traceMetadata.annotateExplain(
ih.explainPlan,
trace,
cfg.TestingKnobs.DeterministicExplain,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (p *planner) maybeLogStatementInternal(
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
ContentionNanos: queryLevelStats.ContentionTime.Nanoseconds(),
Regions: p.curPlan.instrumentation.regions,
Regions: queryLevelStats.Regions,
NetworkBytesSent: queryLevelStats.NetworkBytesSent,
MaxMemUsage: queryLevelStats.MaxMemUsage,
MaxDiskUsage: queryLevelStats.MaxDiskUsage,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/execinfra/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,9 @@ func (flowCtx *FlowCtx) ProcessorComponentID(procID int32) execinfrapb.Component
func (flowCtx *FlowCtx) StreamComponentID(streamID execinfrapb.StreamID) execinfrapb.ComponentID {
return execinfrapb.StreamComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, streamID)
}

// FlowComponentID returns a ComponentID for the given flow.
func (flowCtx *FlowCtx) FlowComponentID() execinfrapb.ComponentID {
region, _ := flowCtx.Cfg.Locality.Find("region")
return execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID, region)
}
3 changes: 2 additions & 1 deletion pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ func StreamComponentID(
}

// FlowComponentID returns a ComponentID for the given flow.
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID) ComponentID {
func FlowComponentID(instanceID base.SQLInstanceID, flowID FlowID, region string) ComponentID {
return ComponentID{
FlowID: flowID,
Type: ComponentID_FLOW,
SQLInstanceID: instanceID,
Region: region,
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ message ComponentID {
(gogoproto.nullable) = false,
(gogoproto.customname) = "SQLInstanceID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/base.SQLInstanceID"];

// The region the component is associated with.
// For the initial implementation only ComponentIDs of Flow type might have this set.
optional string region = 5 [(gogoproto.nullable) = false];
}

// ComponentStats contains statistics for an execution component. A component is
Expand Down
Loading

0 comments on commit 3fc2c00

Please sign in to comment.