Skip to content

Commit

Permalink
sql: report contention on writes in EXPLAIN ANALYZE
Browse files Browse the repository at this point in the history
This commit adds the contention events listener to `planNodeToRowSource`
which allows us to add contention time information for mutation
planNodes to be shown in EXPLAIN ANALYZE.

Release note (sql change): CockroachDB now reports contention time
encountered while executing mutation statements (INSERT, UPSERT, UPDATE,
DELETE) when run via EXPLAIN ANALYZE.
  • Loading branch information
yuzefovich committed Jul 7, 2023
1 parent f788417 commit 628a1a7
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 14 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/opt/exec/explain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ go_test(
embed = [":explain"],
deps = [
"//pkg/base",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/execinfra",
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opt/exec/explain/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
)

func TestMain(m *testing.M) {
securityassets.SetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
os.Exit(m.Run())
}
129 changes: 120 additions & 9 deletions pkg/sql/opt/exec/explain/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/opt/exec/explain"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -130,8 +128,6 @@ func TestMaxDiskSpillUsage(t *testing.T) {
distSQLKnobs := &execinfra.TestingKnobs{}
distSQLKnobs.ForceDiskSpill = true
testClusterArgs.ServerArgs.Knobs.DistSQL = distSQLKnobs
testClusterArgs.ServerArgs.Insecure = true
serverutils.InitTestServerFactory(server.TestServerFactory)
tc := testcluster.StartTestCluster(t, 1, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
Expand All @@ -150,9 +146,6 @@ CREATE TABLE t (a PRIMARY KEY, b) AS SELECT i, i FROM generate_series(1, 10) AS
for rows.Next() {
var res string
assert.NoError(t, rows.Scan(&res))
var sb strings.Builder
sb.WriteString(res)
sb.WriteByte('\n')
if matches := re.FindStringSubmatch(res); len(matches) > 0 {
return true
}
Expand Down Expand Up @@ -184,10 +177,8 @@ func TestCPUTimeEndToEnd(t *testing.T) {
distSQLKnobs := &execinfra.TestingKnobs{}
distSQLKnobs.ForceDiskSpill = true
testClusterArgs.ServerArgs.Knobs.DistSQL = distSQLKnobs
testClusterArgs.ServerArgs.Insecure = true
const numNodes = 3

serverutils.InitTestServerFactory(server.TestServerFactory)
tc := testcluster.StartTestCluster(t, numNodes, testClusterArgs)
ctx := context.Background()
defer tc.Stopper().Stop(ctx)
Expand Down Expand Up @@ -245,3 +236,123 @@ func TestCPUTimeEndToEnd(t *testing.T) {
runQuery("SELECT * FROM (VALUES (1), (2), (3)) v(a) INNER LOOKUP JOIN t ON a = x", false /* hideCPU */)
runQuery("SELECT count(*) FROM generate_series(1, 100000)", false /* hideCPU */)
}

// TestContentionTimeOnWrites verifies that the contention encountered during a
// mutation is reported on EXPLAIN ANALYZE output.
func TestContentionTimeOnWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
ctx := context.Background()
defer tc.Stopper().Stop(ctx)

runner := sqlutils.MakeSQLRunner(tc.Conns[0])
runner.Exec(t, "CREATE TABLE t (k INT PRIMARY KEY, v INT)")

// The test involves three goroutines:
// - the main goroutine acts as the coordinator. It first waits for worker 1
// to perform its mutation in an open txn, then blocks until worker 2
// begins executing its mutation, then unblocks worker 1 and waits for
// both workers to exit.
// - worker 1 goroutine performs a mutation without committing a txn. It
// notifies the main goroutine by closing 'sem' once the mutation has been
// performed. It then blocks until 'commitCh' is closed by the main
// goroutine which allows worker 2 to experience contention.
// - worker 2 goroutine performs a mutation via EXPLAIN ANALYZE. This query
// will be blocked until worker 1 commits its txn, so it should see
// contention time reported in the output.

sem := make(chan struct{})
errCh := make(chan error, 1)
commitCh := make(chan struct{})
go func() {
defer close(errCh)
// Ensure that sem is always closed (in case we encounter an error
// before the mutation is performed).
var closedSem bool
defer func() {
if !closedSem {
close(sem)
}
}()
txn, err := tc.Conns[0].Begin()
if err != nil {
errCh <- err
return
}
_, err = txn.Exec("INSERT INTO t VALUES (1, 1)")
if err != nil {
errCh <- err
return
}
// Notify the main goroutine that the mutation has been performed.
close(sem)
closedSem = true
// Block until the main goroutine tells us that we're good to commit.
<-commitCh
if err = txn.Commit(); err != nil {
errCh <- err
return
}
}()

// Block until the mutation of worker 1 is done.
<-sem
// Check that no error was encountered before that.
select {
case err := <-errCh:
t.Fatal(err)
default:
}

var foundContention bool
errCh2 := make(chan error, 1)
go func() {
defer close(errCh2)
// Execute the mutation via EXPLAIN ANALYZE and check whether the
// contention is reported.
contentionRE := regexp.MustCompile(`cumulative time spent due to contention.*`)
rows := runner.Query(t, "EXPLAIN ANALYZE UPSERT INTO t VALUES (1, 2)")
for rows.Next() {
var line string
if err := rows.Scan(&line); err != nil {
errCh2 <- err
return
}
if contentionRE.MatchString(line) {
foundContention = true
}
}
}()

// Continuously poll the cluster queries until we see that the query that
// should be experiencing contention has started executing.
for {
row := runner.QueryRow(t, "SELECT count(*) FROM [SHOW CLUSTER QUERIES] WHERE query LIKE '%EXPLAIN ANALYZE UPSERT%'")
var count int
row.Scan(&count)
// Sleep for non-trivial amount of time to allow for worker 2 to start
// (if it hasn't already) and to experience the contention (if it has
// started).
time.Sleep(time.Second)
if count == 2 {
// We stop polling once we see 2 queries matching the LIKE pattern:
// the mutation query from worker 2 and the polling query itself.
break
}
}

// Allow worker 1 to commit which should unblock both workers.
close(commitCh)

// Wait for both workers to exit. Also perform sanity checks that the
// workers didn't run into any errors.
err := <-errCh
require.NoError(t, err)
err = <-errCh2
require.NoError(t, err)

// Meat of the test - verify that the contention was reported.
require.True(t, foundContention)
}
12 changes: 7 additions & 5 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type planNodeToRowSource struct {
// run time state machine values
row rowenc.EncDatumRow

contentionEventsListener execstats.ContentionEventsListener
tenantConsumptionListener execstats.TenantConsumptionListener
}

Expand Down Expand Up @@ -168,7 +169,7 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS
}

func (p *planNodeToRowSource) Start(ctx context.Context) {
ctx = p.StartInternal(ctx, nodeName(p.node), &p.tenantConsumptionListener)
ctx = p.StartInternal(ctx, nodeName(p.node), &p.contentionEventsListener, &p.tenantConsumptionListener)
p.params.ctx = ctx
// This starts all of the nodes below this node.
if err := startExec(p.params, p.node); err != nil {
Expand Down Expand Up @@ -260,13 +261,14 @@ func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetad

// execStatsForTrace implements ProcessorBase.ExecStatsForTrace.
func (p *planNodeToRowSource) execStatsForTrace() *execinfrapb.ComponentStats {
// Propagate RUs from IO requests.
// TODO(drewk): we should consider propagating other stats for planNode
// operators.
if p.tenantConsumptionListener.ConsumedRU == 0 {
// Propagate contention time and RUs from IO requests.
if p.contentionEventsListener.CumulativeContentionTime == 0 && p.tenantConsumptionListener.ConsumedRU == 0 {
return nil
}
return &execinfrapb.ComponentStats{
KV: execinfrapb.KVStats{
ContentionTime: optional.MakeTimeValue(p.contentionEventsListener.CumulativeContentionTime),
},
Exec: execinfrapb.ExecStats{
ConsumedRU: optional.MakeUint(p.tenantConsumptionListener.ConsumedRU),
},
Expand Down

0 comments on commit 628a1a7

Please sign in to comment.