Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: report contention on writes in EXPLAIN ANALYZE #106354

Merged
merged 1 commit into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/sql/opt/exec/explain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ go_test(
embed = [":explain"],
deps = [
"//pkg/base",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/execinfra",
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opt/exec/explain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
)

func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
os.Exit(m.Run())
}
129 changes: 120 additions & 9 deletions pkg/sql/opt/exec/explain/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -130,8 +128,6 @@ func TestMaxDiskSpillUsage(t *testing.T) {
distSQLKnobs := &execinfra.TestingKnobs{}
distSQLKnobs.ForceDiskSpill = true
testClusterArgs.ServerArgs.Knobs.DistSQL = distSQLKnobs
testClusterArgs.ServerArgs.Insecure = true
serverutils.InitTestServerFactory(server.TestServerFactory)
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
Expand All @@ -150,9 +146,6 @@ CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 10) AS
for rows.Next() {
var res string
assert.NoError(t, rows.Scan(&res))
var sb strings.Builder
sb.WriteString(res)
sb.WriteByte('\n')
if matches := re.FindStringSubmatch(res); len(matches) > 0 {
return true
}
Expand Down Expand Up @@ -184,10 +177,8 @@ func TestCPUTimeEndToEnd(t *testing.T) {
distSQLKnobs := &execinfra.TestingKnobs{}
distSQLKnobs.ForceDiskSpill = true
testClusterArgs.ServerArgs.Knobs.DistSQL = distSQLKnobs
testClusterArgs.ServerArgs.Insecure = true
const numNodes = 3

serverutils.InitTestServerFactory(server.TestServerFactory)
tc := testcluster.StartTestCluster(t, numNodes, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
Expand Down Expand Up @@ -245,3 +236,123 @@ func TestCPUTimeEndToEnd(t *testing.T) {
runQuery("SELECT * FROM (VALUES (1), (2), (3)) v(a) INNER LOOKUP JOIN t ON a = x", false /* hideCPU */)
runQuery("SELECT count(*) FROM generate_series(1, 100000)", false /* hideCPU */)
}

// TestContentionTimeOnWrites verifies that the contention encountered during a
// mutation is reported on EXPLAIN ANALYZE output.
func TestContentionTimeOnWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

runner := sqlutils.MakeSQLRunner(tc.Conns[0])
runner.Exec(t, "CREATE TABLE t (k INT PRIMARY KEY, v INT)")

// The test involves three goroutines:
// - the main goroutine acts as the coordinator. It first waits for worker 1
// to perform its mutation in an open txn, then blocks until worker 2
// begins executing its mutation, then unblocks worker 1 and waits for
// both workers to exit.
// - worker 1 goroutine performs a mutation without committing a txn. It
// notifies the main goroutine by closing 'sem' once the mutation has been
// performed. It then blocks until 'commitCh' is closed by the main
// goroutine which allows worker 2 to experience contention.
// - worker 2 goroutine performs a mutation via EXPLAIN ANALYZE. This query
// will be blocked until worker 1 commits its txn, so it should see
// contention time reported in the output.

sem := make(chan struct{})
errCh := make(chan error, 1)
commitCh := make(chan struct{})
go func() {
defer close(errCh)
// Ensure that sem is always closed (in case we encounter an error
// before the mutation is performed).
var closedSem bool
defer func() {
if !closedSem {
close(sem)
}
}()
txn, err := tc.Conns[0].Begin()
if err != nil {
errCh <- err
return
}
_, err = txn.Exec("INSERT INTO t VALUES (1, 1)")
if err != nil {
errCh <- err
return
}
// Notify the main goroutine that the mutation has been performed.
close(sem)
closedSem = true
// Block until the main goroutine tells us that we're good to commit.
<-commitCh
if err = txn.Commit(); err != nil {
errCh <- err
return
}
}()

// Block until the mutation of worker 1 is done.
<-sem
// Check that no error was encountered before that.
select {
case err := <-errCh:
t.Fatal(err)
default:
}

var foundContention bool
errCh2 := make(chan error, 1)
go func() {
defer close(errCh2)
// Execute the mutation via EXPLAIN ANALYZE and check whether the
// contention is reported.
contentionRE := regexp.MustCompile(`cumulative time spent due to contention.*`)
rows := runner.Query(t, "EXPLAIN ANALYZE UPSERT INTO t VALUES (1, 2)")
for rows.Next() {
var line string
if err := rows.Scan(&line); err != nil {
errCh2 <- err
return
}
if contentionRE.MatchString(line) {
foundContention = true
}
}
}()

// Continuously poll the cluster queries until we see that the query that
// should be experiencing contention has started executing.
for {
row := runner.QueryRow(t, "SELECT count(*) FROM [SHOW CLUSTER QUERIES] WHERE query LIKE '%EXPLAIN ANALYZE UPSERT%'")
var count int
row.Scan(&count)
// Sleep for non-trivial amount of time to allow for worker 2 to start
// (if it hasn't already) and to experience the contention (if it has
// started).
time.Sleep(time.Second)
if count == 2 {
// We stop polling once we see 2 queries matching the LIKE pattern:
// the mutation query from worker 2 and the polling query itself.
break
}
}

// Allow worker 1 to commit which should unblock both workers.
close(commitCh)

// Wait for both workers to exit. Also perform sanity checks that the
// workers didn't run into any errors.
err := <-errCh
require.NoError(t, err)
err = <-errCh2
require.NoError(t, err)

// Meat of the test - verify that the contention was reported.
require.True(t, foundContention)
}
12 changes: 7 additions & 5 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type planNodeToRowSource struct {
// run time state machine values
row rowenc.EncDatumRow

contentionEventsListener execstats.ContentionEventsListener
tenantConsumptionListener execstats.TenantConsumptionListener
}

Expand Down Expand Up @@ -168,7 +169,7 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS
}

func (p *planNodeToRowSource) Start(ctx context.Context) {
ctx = p.StartInternal(ctx, nodeName(p.node), &p.tenantConsumptionListener)
ctx = p.StartInternal(ctx, nodeName(p.node), &p.contentionEventsListener, &p.tenantConsumptionListener)
p.params.ctx = ctx
// This starts all of the nodes below this node.
if err := startExec(p.params, p.node); err != nil {
Expand Down Expand Up @@ -260,13 +261,14 @@ func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetad

// execStatsForTrace implements ProcessorBase.ExecStatsForTrace.
func (p *planNodeToRowSource) execStatsForTrace() *execinfrapb.ComponentStats {
// Propagate RUs from IO requests.
// TODO(drewk): we should consider propagating other stats for planNode
// operators.
if p.tenantConsumptionListener.ConsumedRU == 0 {
// Propagate contention time and RUs from IO requests.
if p.contentionEventsListener.CumulativeContentionTime == 0 && p.tenantConsumptionListener.ConsumedRU == 0 {
return nil
}
return &execinfrapb.ComponentStats{
KV: execinfrapb.KVStats{
ContentionTime: optional.MakeTimeValue(p.contentionEventsListener.CumulativeContentionTime),
},
Exec: execinfrapb.ExecStats{
ConsumedRU: optional.MakeUint(p.tenantConsumptionListener.ConsumedRU),
},
Expand Down