Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
81995: colexec: derive a tracing span in materializer when collecting stats r=yuzefovich a=yuzefovich

If we don't do this, then the stats would be attached to the span of
the materializer's user, and if that user itself has a lot of
payloads to attach (e.g. a joinReader attaching the KV keys it looked
up), then the stats might be dropped based on the maximum size of
structured payload per tracing span of 10KiB (see
`tracing.maxStructuredBytesPerSpan`). Deriving a separate span
guarantees that the stats won't be dropped.

This required some changes to make a test - that makes too many
assumptions about tracing infra - work.

Release note: None

84358: sql/tests: TestRandomSyntaxSchemaChangeColumn use a resettable timeout r=fqazi a=fqazi

fixes cockroachdb#65736

Previously, TestRandomSyntaxSchemaChangeColumn had a fixed
timeout, which meant that if schema change got stuck behind
each other, this timeout may not have been sufficient. Previously,
we tried bumping up this timeout, but this is not the most reliable
for this test. To address this, this patch introduces the concept of
resettable timeouts, which states that the timeout expires only if
no other statements are complete within the given timeout (otherwise,
its recalculated since the completion of the last statement. To avoid
potential starvation there is a limit on the number of resets,
which guarantees eventual expiry if a query is always bypassed.

Release note: None

85081: ui/cluster-ui: add wait time insights to active executions r=xinhaoz a=xinhaoz

Closes: cockroachdb#79127
Closes: cockroachdb#79131

This commit introduces a new section, 'Wait Time Insights' to the
active stmts/txns details pages. The section is included if the txn
being viewed is experiencing contention and includes info on the
blocked schema, table, index name, time spent blocking, and the
executions blocking or waiting for the viewed execution.

The section is powered by querying the `crdb_internal.cluster_locks`
table via the SQL api in `/api/v2/`. The table informatioin is
refreshed at an interval of 10s while on active execution pages,
and is requested along with session information (to give info on
active txns/stmts).

Only users having VIEWACTIVITY or higher permissions can view this
feature.

Refactor note: to remove  duplication across shared selector logic
in the active txn components, `activeExecutionsCommon.selectors.ts`
has been created. This file exports  combiner functions for active
txn selectors. The combiner func step contains the bulk of the logic
to transform data from the redux state to component props, thus future
changes to this logic will not need to be duplicated across packages.

Release note (ui change): A new section, 'Wait Time Insights' has
been added to the active statement and transaction details page.
The section is included if the txn being viewed is experiencing
contention and includes info on the blocked schema, table, index
name, time spent blocking, and the txns blocking or waiting for the
viewed txn.  Only users having `VIEWACTIVITY` or higher can view
this feature. The column 'Time Spent Waiting' has been added to
the active executions tables that shows the total amount of time
an execution has been waiting for a lock.

--------------
### Note to reviewers: only the 2nd commit is relevant, see first commit here: cockroachdb#85080

https://www.loom.com/share/53d8a74f9c9041a9b967e32dc370a153

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
Co-authored-by: Xin Hao Zhang <[email protected]>
  • Loading branch information
4 people committed Aug 11, 2022
4 parents fb8b849 + 8303dda + d2f78ea + 3556435 commit e011dd4
Show file tree
Hide file tree
Showing 44 changed files with 1,427 additions and 633 deletions.
39 changes: 28 additions & 11 deletions pkg/sql/colexec/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,19 @@ func (m *Materializer) OutputTypes() []*types.T {

// Start is part of the execinfra.RowSource interface.
func (m *Materializer) Start(ctx context.Context) {
ctx = m.StartInternalNoSpan(ctx)
if len(m.drainHelper.statsCollectors) > 0 {
// Since we're collecting stats, we'll derive a separate tracing span
// for them. If we don't do this, then the stats would be attached to
// the span of the materializer's user, and if that user itself has a
// lot of payloads to attach (e.g. a joinReader attaching the KV keys it
// looked up), then the stats might be dropped based on the maximum size
// of structured payload per tracing span of 10KiB (see
// tracing.maxStructuredBytesPerSpan). Deriving a separate span
// guarantees that the stats won't be dropped.
ctx = m.StartInternal(ctx, "materializer" /* name */)
} else {
ctx = m.StartInternalNoSpan(ctx)
}
// We can encounter an expected error during Init (e.g. an operator
// attempts to allocate a batch, but the memory budget limit has been
// reached), so we need to wrap it with a catcher.
Expand Down Expand Up @@ -335,17 +347,22 @@ func (m *Materializer) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata
}

func (m *Materializer) close() {
if m.InternalClose() {
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
if m.Closed {
return
}
if m.Ctx == nil {
// In some edge cases (like when Init of an operator above this
// materializer encounters a panic), the materializer might never be
// started, yet it still will attempt to close its Closers. This
// context is only used for logging purposes, so it is ok to grab
// the background context in order to prevent a NPE below.
m.Ctx = context.Background()
}
// Make sure to call InternalClose() only after closing the closers - this
// allows the closers to utilize the unfinished tracing span (if tracing is
// enabled).
m.closers.CloseAndLogOnErr(m.Ctx, "materializer")
m.InternalClose()
}

// ConsumerClosed is part of the execinfra.RowSource interface.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,10 @@ type ExecutorTestingKnobs struct {
// to use a transaction, and, in doing so, more deterministically allocate
// descriptor IDs at the cost of decreased parallelism.
UseTransactionalDescIDGenerator bool

// NoStatsCollectionWithVerboseTracing is used to disable the execution
// statistics collection in presence of the verbose tracing.
NoStatsCollectionWithVerboseTracing bool
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (ih *instrumentationHelper) Setup(
statsCollector.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, p.SessionData().Database)

if sp := tracing.SpanFromContext(ctx); sp != nil {
if sp.IsVerbose() {
if sp.IsVerbose() && !cfg.TestingKnobs.NoStatsCollectionWithVerboseTracing {
// If verbose tracing was enabled at a higher level, stats
// collection is enabled so that stats are shown in the traces, but
// no extra work is needed by the instrumentationHelper.
Expand Down
164 changes: 112 additions & 52 deletions pkg/sql/tests/rsg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
flagRSGTime = flag.Duration("rsg", 0, "random syntax generator test duration")
flagRSGGoRoutines = flag.Int("rsg-routines", 1, "number of Go routines executing random statements in each RSG test")
flagRSGExecTimeout = flag.Duration("rsg-exec-timeout", 15*time.Second, "timeout duration when executing a statement")
flagRSGExecColumnChangeTimeout = flag.Duration("rsg-exec-column-change-timeout", 2*time.Minute, "timeout duration when executing a statement for random column changes")
flagRSGExecColumnChangeTimeout = flag.Duration("rsg-exec-column-change-timeout", 20*time.Second, "timeout duration when executing a statement for random column changes")
)

func verifyFormat(sql string) error {
Expand Down Expand Up @@ -83,6 +83,9 @@ type verifyFormatDB struct {
syncutil.Mutex
// active holds the currently executing statements.
active map[string]int
// lastCompletedStmt tracks the time when the last statement finished
// executing, which will be used for resettable timeouts.
lastCompletedStmt time.Time
}
}

Expand All @@ -98,6 +101,7 @@ func (db *verifyFormatDB) Incr(sql string) func() {
return func() {
db.mu.Lock()
db.mu.active[sql]--
db.mu.lastCompletedStmt = timeutil.Now()
if db.mu.active[sql] == 0 {
delete(db.mu.active, sql)
}
Expand Down Expand Up @@ -128,6 +132,21 @@ func (db *verifyFormatDB) exec(t *testing.T, ctx context.Context, sql string) er
}
func (db *verifyFormatDB) execWithTimeout(
t *testing.T, ctx context.Context, sql string, duration time.Duration,
) error {
return db.execWithResettableTimeout(t,
ctx,
sql,
duration,
0 /* no resets allowed */)
}

// execWithResettableTimeout executes a statement with a timeout, if the type of
// timeout is resettable then the timeout will be reset everytime a query completes.
// This specifically is used in cases where multiple things might be serially
// executed, for example schema changes on the same table. maxResets can be used
// to guarantee we don't endlessly extend the timeout.
func (db *verifyFormatDB) execWithResettableTimeout(
t *testing.T, ctx context.Context, sql string, duration time.Duration, maxResets int,
) error {
if err := func() (retErr error) {
defer func() {
Expand All @@ -154,64 +173,98 @@ func (db *verifyFormatDB) execWithTimeout(
_, err := db.db.ExecContext(ctx, sql)
funcdone <- err
}()
select {
case err := <-funcdone:
if err != nil {
if pqerr := (*pq.Error)(nil); errors.As(err, &pqerr) {
// Output Postgres error code if it's available.
if pgcode.MakeCode(string(pqerr.Code)) == pgcode.CrashShutdown {
return &crasher{
sql: sql,
err: err,
detail: pqerr.Detail,
retry := true
targetDuration := duration
for retry {
retry = false
err := func() error {
select {
case err := <-funcdone:
if err != nil {
if pqerr := (*pq.Error)(nil); errors.As(err, &pqerr) {
// Output Postgres error code if it's available.
if pgcode.MakeCode(string(pqerr.Code)) == pgcode.CrashShutdown {
return &crasher{
sql: sql,
err: err,
detail: pqerr.Detail,
}
}
}
if es := err.Error(); strings.Contains(es, "internal error") ||
strings.Contains(es, "driver: bad connection") ||
strings.Contains(es, "unexpected error inside CockroachDB") {
return &crasher{
sql: sql,
err: err,
}
}
return &nonCrasher{sql: sql, err: err}
}
return nil
case <-time.After(targetDuration):
db.mu.Lock()
defer db.mu.Unlock()
// In the resettable mode, we are going to wait for no progress on any
// queries before declaring this a hang.
if maxResets > 0 {
if db.mu.lastCompletedStmt.Add(duration).After(timeutil.Now()) {
// Recompute the timeout duration based, so that the timeout is
// N seconds after the last queries completion. This is done to
// the timeouts between queries more reasonable for long intervals:
// (1) => Executes work in 1 second (setting the last completed query)
// (2) => Times out after 2 minutes
// If we simply wait the duration for (2) then we will incur another
// 2 minute wait and miss potential hangs (if the test times out first).
// Whereas this approach will wait 2 minutes after the completion of
// (1), only waiting an extra second more.
targetDuration = duration - db.mu.lastCompletedStmt.Add(duration).Sub(timeutil.Now())
// Avoid having super tight spins, wait at least a second.
if targetDuration <= time.Second {
targetDuration = time.Second
}
retry = true
maxResets -= 1
return nil
}
}
b := make([]byte, 1024*1024)
n := runtime.Stack(b, true)
t.Logf("%s\n", b[:n])
// Now see if we can execute a SELECT 1. This is useful because sometimes an
// exec timeout is because of a slow-executing statement, and other times
// it's because the server is completely wedged. This is an automated way
// to find out.
errch := make(chan error, 1)
go func() {
rows, err := db.db.Query(`SELECT 1`)
if err == nil {
rows.Close()
}
errch <- err
}()
select {
case <-time.After(5 * time.Second):
t.Log("SELECT 1 timeout: probably a wedged server")
case err := <-errch:
if err != nil {
t.Log("SELECT 1 execute error:", err)
} else {
t.Log("SELECT 1 executed successfully: probably a slow statement")
}
}
}
if es := err.Error(); strings.Contains(es, "internal error") ||
strings.Contains(es, "driver: bad connection") ||
strings.Contains(es, "unexpected error inside CockroachDB") {
return &crasher{
sql: sql,
err: err,
sql: sql,
err: errors.Newf("statement exec timeout"),
detail: fmt.Sprintf("timeout: %q. currently executing: %v", sql, db.mu.active),
}
}
return &nonCrasher{sql: sql, err: err}
}
return nil
case <-time.After(duration):
db.mu.Lock()
defer db.mu.Unlock()
b := make([]byte, 1024*1024)
n := runtime.Stack(b, true)
t.Logf("%s\n", b[:n])
// Now see if we can execute a SELECT 1. This is useful because sometimes an
// exec timeout is because of a slow-executing statement, and other times
// it's because the server is completely wedged. This is an automated way
// to find out.
errch := make(chan error, 1)
go func() {
rows, err := db.db.Query(`SELECT 1`)
if err == nil {
rows.Close()
}
errch <- err
}()
select {
case <-time.After(5 * time.Second):
t.Log("SELECT 1 timeout: probably a wedged server")
case err := <-errch:
if err != nil {
t.Log("SELECT 1 execute error:", err)
} else {
t.Log("SELECT 1 executed successfully: probably a slow statement")
}
}
return &crasher{
sql: sql,
err: errors.Newf("statement exec timeout"),
detail: fmt.Sprintf("timeout: %q. currently executing: %v", sql, db.mu.active),
if err != nil {
return err
}
}
return nil
}

func TestRandomSyntaxGeneration(t *testing.T) {
Expand Down Expand Up @@ -421,7 +474,14 @@ func TestRandomSyntaxSchemaChangeColumn(t *testing.T) {
}, func(ctx context.Context, db *verifyFormatDB, r *rsg.RSG) error {
n := r.Intn(len(roots))
s := fmt.Sprintf("ALTER TABLE ident.ident %s", r.Generate(roots[n], 500))
return db.execWithTimeout(t, ctx, s, *flagRSGExecColumnChangeTimeout)
// Execute with a resettable timeout, where we allow up to N go-routines worth
// of resets. This should be the maximum theoretical time we can get
// stuck behind other work.
return db.execWithResettableTimeout(t,
ctx,
s,
*flagRSGExecColumnChangeTimeout,
*flagRSGGoRoutines)
})
}

Expand Down
Loading

0 comments on commit e011dd4

Please sign in to comment.