Skip to content

Commit

Permalink
distsql: add logging to flow scheduler
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
asubiotto committed Jul 11, 2018
1 parent 4b7413a commit 4e47df4
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"container/list"
"context"

"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const flowDoneChanSize = 8
Expand Down Expand Up @@ -54,8 +57,9 @@ type flowScheduler struct {
// TODO(asubiotto): Figure out if asynchronous flow execution can be rearranged
// to avoid the need to store the context.
type flowWithCtx struct {
ctx context.Context
flow *Flow
ctx context.Context
flow *Flow
enqueueTime time.Time
}

func newFlowScheduler(
Expand Down Expand Up @@ -88,6 +92,9 @@ func (fs *flowScheduler) canRunFlow(_ *Flow) bool {

// runFlowNow starts the given flow; does not wait for the flow to complete.
func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error {
log.VEventf(
ctx, 1, "flow scheduler running flow %s, currently running %d", f.id, fs.mu.numRunning,
)
fs.mu.numRunning++
fs.metrics.FlowStart()
if err := f.Start(ctx, func() { fs.flowDoneCh <- f }); err != nil {
Expand Down Expand Up @@ -116,9 +123,11 @@ func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
if fs.canRunFlow(f) {
return fs.runFlowNow(ctx, f)
}
log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id)
fs.mu.queue.PushBack(&flowWithCtx{
ctx: ctx,
flow: f,
ctx: ctx,
flow: f,
enqueueTime: timeutil.Now(),
})
return nil

Expand Down Expand Up @@ -148,6 +157,9 @@ func (fs *flowScheduler) Start() {
if frElem := fs.mu.queue.Front(); frElem != nil {
n := frElem.Value.(*flowWithCtx)
fs.mu.queue.Remove(frElem)
log.VEventf(
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, timeutil.Since(n.enqueueTime),
)
// Note: we use the flow's context instead of the worker
// context, to ensure that logging etc is relative to the
// specific flow.
Expand Down

0 comments on commit 4e47df4

Please sign in to comment.