Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: add flow setup cluster settings and improve debuggability #27404

Merged
merged 2 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
<tr><td><code>sql.defaults.distsql</code></td><td>enumeration</td><td><code>1</code></td><td>Default distributed SQL execution mode [off = 0, auto = 1, on = 2]</td></tr>
<tr><td><code>sql.defaults.optimizer</code></td><td>enumeration</td><td><code>1</code></td><td>Default cost-based optimizer mode [off = 0, on = 1, local = 2]</td></tr>
<tr><td><code>sql.distsql.distribute_index_joins</code></td><td>boolean</td><td><code>true</code></td><td>if set, for index joins we instantiate a join reader on every node that has a stream; if not set, we use a single join reader</td></tr>
<tr><td><code>sql.distsql.flow_stream_timeout</code></td><td>duration</td><td><code>10s</code></td><td>amount of time incoming streams wait for a flow to be set up before erroring out</td></tr>
<tr><td><code>sql.distsql.interleaved_joins.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set we plan interleaved table joins instead of merge joins when possible</td></tr>
<tr><td><code>sql.distsql.max_running_flows</code></td><td>integer</td><td><code>500</code></td><td>maximum number of concurrent flows that can be run on a node</td></tr>
<tr><td><code>sql.distsql.merge_joins.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, we plan merge joins when possible</td></tr>
<tr><td><code>sql.distsql.temp_storage.joins</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable use of disk for distributed sql joins</td></tr>
<tr><td><code>sql.distsql.temp_storage.sorts</code></td><td>boolean</td><td><code>true</code></td><td>set to true to enable use of disk for distributed sql sorts</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func (f *Flow) Start(ctx context.Context, doneFn func()) error {
f.waitGroup.Add(len(f.inboundStreams))

if err := f.flowRegistry.RegisterFlow(
ctx, f.id, f, f.inboundStreams, flowStreamDefaultTimeout,
ctx, f.id, f, f.inboundStreams, settingFlowStreamTimeout.Get(&f.FlowCtx.Settings.SV),
); err != nil {
if f.syncFlowConsumer != nil {
// For sync flows, the error goes to the consumer.
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/distsqlrun/flow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
)

// flowStreamDefaultTimeout is the amount of time incoming streams wait for a flow to
// be set up before erroring out.
const flowStreamDefaultTimeout time.Duration = 10 * time.Second
var settingFlowStreamTimeout = settings.RegisterNonNegativeDurationSetting(
"sql.distsql.flow_stream_timeout",
"amount of time incoming streams wait for a flow to be set up before erroring out",
10*time.Second,
)

// expectedConnectionTime is the expected time taken by a flow to connect to its
// consumers.
Expand Down
12 changes: 7 additions & 5 deletions pkg/sql/distsqlrun/flow_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ func TestFlowRegistry(t *testing.T) {
t.Error("looked up unregistered flow")
}

const flowStreamTimeout = 10 * time.Second

ctx := context.Background()
if err := reg.RegisterFlow(
ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout,
ctx, id1, f1, nil /* inboundStreams */, flowStreamTimeout,
); err != nil {
t.Fatal(err)
}
Expand All @@ -117,7 +119,7 @@ func TestFlowRegistry(t *testing.T) {
go func() {
time.Sleep(jiffy)
if err := reg.RegisterFlow(
ctx, id1, f1, nil /* inboundStreams */, flowStreamDefaultTimeout,
ctx, id1, f1, nil /* inboundStreams */, flowStreamTimeout,
); err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -152,7 +154,7 @@ func TestFlowRegistry(t *testing.T) {

time.Sleep(jiffy)
if err := reg.RegisterFlow(
ctx, id2, f2, nil /* inboundStreams */, flowStreamDefaultTimeout,
ctx, id2, f2, nil /* inboundStreams */, flowStreamTimeout,
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -181,7 +183,7 @@ func TestFlowRegistry(t *testing.T) {

wg1.Wait()
if err := reg.RegisterFlow(
ctx, id3, f3, nil /* inboundStreams */, flowStreamDefaultTimeout,
ctx, id3, f3, nil /* inboundStreams */, flowStreamTimeout,
); err != nil {
t.Fatal(err)
}
Expand All @@ -192,7 +194,7 @@ func TestFlowRegistry(t *testing.T) {
go func() {
time.Sleep(jiffy)
if err := reg.RegisterFlow(
ctx, id4, f4, nil /* inboundStreams */, flowStreamDefaultTimeout,
ctx, id4, f4, nil /* inboundStreams */, flowStreamTimeout,
); err != nil {
t.Error(err)
}
Expand Down
47 changes: 38 additions & 9 deletions pkg/sql/distsqlrun/flow_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,24 @@ import (
"container/list"
"context"

"time"

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const maxRunningFlows = 500
const flowDoneChanSize = 8

var settingMaxRunningFlows = settings.RegisterIntSetting(
"sql.distsql.max_running_flows",
"maximum number of concurrent flows that can be run on a node",
500,
)

// flowScheduler manages running flows and decides when to queue and when to
// start flows. The main interface it presents is ScheduleFlows, which passes a
// flow to be run.
Expand All @@ -37,21 +47,26 @@ type flowScheduler struct {

mu struct {
syncutil.Mutex
numRunning int
queue *list.List
numRunning int
maxRunningFlows int
queue *list.List
}
}

// flowWithCtx stores a flow to run and a context to run it with.
// TODO(asubiotto): Figure out if asynchronous flow execution can be rearranged
// to avoid the need to store the context.
type flowWithCtx struct {
ctx context.Context
flow *Flow
ctx context.Context
flow *Flow
enqueueTime time.Time
}

func newFlowScheduler(
ambient log.AmbientContext, stopper *stop.Stopper, metrics *DistSQLMetrics,
ambient log.AmbientContext,
stopper *stop.Stopper,
settings *cluster.Settings,
metrics *DistSQLMetrics,
) *flowScheduler {
fs := &flowScheduler{
AmbientContext: ambient,
Expand All @@ -60,17 +75,26 @@ func newFlowScheduler(
metrics: metrics,
}
fs.mu.queue = list.New()
fs.mu.maxRunningFlows = int(settingMaxRunningFlows.Get(&settings.SV))
settingMaxRunningFlows.SetOnChange(&settings.SV, func() {
fs.mu.Lock()
fs.mu.maxRunningFlows = int(settingMaxRunningFlows.Get(&settings.SV))
fs.mu.Unlock()
})
return fs
}

func (fs *flowScheduler) canRunFlow(_ *Flow) bool {
// TODO(radu): we will have more complex resource accounting (like memory).
// For now we just limit the number of concurrent flows.
return fs.mu.numRunning < maxRunningFlows
return fs.mu.numRunning < fs.mu.maxRunningFlows
}

// runFlowNow starts the given flow; does not wait for the flow to complete.
func (fs *flowScheduler) runFlowNow(ctx context.Context, f *Flow) error {
log.VEventf(
ctx, 1, "flow scheduler running flow %s, currently running %d", f.id, fs.mu.numRunning,
)
fs.mu.numRunning++
fs.metrics.FlowStart()
if err := f.Start(ctx, func() { fs.flowDoneCh <- f }); err != nil {
Expand Down Expand Up @@ -99,9 +123,11 @@ func (fs *flowScheduler) ScheduleFlow(ctx context.Context, f *Flow) error {
if fs.canRunFlow(f) {
return fs.runFlowNow(ctx, f)
}
log.VEventf(ctx, 1, "flow scheduler enqueuing flow %s to be run later", f.id)
fs.mu.queue.PushBack(&flowWithCtx{
ctx: ctx,
flow: f,
ctx: ctx,
flow: f,
enqueueTime: timeutil.Now(),
})
return nil

Expand Down Expand Up @@ -131,6 +157,9 @@ func (fs *flowScheduler) Start() {
if frElem := fs.mu.queue.Front(); frElem != nil {
n := frElem.Value.(*flowWithCtx)
fs.mu.queue.Remove(frElem)
log.VEventf(
n.ctx, 1, "flow scheduler dequeued flow %s, spent %s in queue", n.flow.id, timeutil.Since(n.enqueueTime),
)
// Note: we use the flow's context instead of the worker
// context, to ensure that logging etc is relative to the
// specific flow.
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/distsqlrun/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func NewServer(ctx context.Context, cfg ServerConfig) *ServerImpl {
ServerConfig: cfg,
regexpCache: tree.NewRegexpCache(512),
flowRegistry: makeFlowRegistry(cfg.NodeID.Get()),
flowScheduler: newFlowScheduler(cfg.AmbientContext, cfg.Stopper, cfg.Metrics),
flowScheduler: newFlowScheduler(cfg.AmbientContext, cfg.Stopper, cfg.Settings, cfg.Metrics),
memMonitor: mon.MakeMonitor(
"distsql",
mon.MemoryResource,
Expand Down Expand Up @@ -487,7 +487,8 @@ func (ds *ServerImpl) flowStreamInt(ctx context.Context, stream DistSQL_FlowStre
log.Infof(ctx, "connecting inbound stream %s/%d", flowID.Short(), streamID)
}
f, receiver, cleanup, err := ds.flowRegistry.ConnectInboundStream(
ctx, flowID, streamID, stream, flowStreamDefaultTimeout)
ctx, flowID, streamID, stream, settingFlowStreamTimeout.Get(&ds.Settings.SV),
)
if err != nil {
return err
}
Expand Down