Skip to content

Commit

Permalink
distsql: make the number of DistSQL runners dynamic
Browse files Browse the repository at this point in the history
This commit improves the infrastructure around a pool of "DistSQL
runners" that are used for issuing SetupFlow RPCs in parallel.
Previously, we had a hard-coded number of 16 goroutines which was
probably insufficient in many cases. This commit makes it so that we use
the default value of `4 x N(cpus)` to make it proportional to how beefy
the node is (under the expectation that the larger the node is, the more
distributed queries it will be handling). The choice of the four as the
multiple was made so that we get the previous default on machines with
4 CPUs.

Additionally, this commit introduces a mechanism to dynamically adjust
the number of runners based on a cluster setting. Whenever the setting
is reduced, some of the workers are stopped, if the setting is
increased, then new workers are spun up accordingly. This coordinator
listens on two channels: one about the server quescing, and another
about the new target pool size. Whenever a new target size is received,
the coordinator will spin up / shut down one worker at a time until that
target size is achieved. The worker, however, doesn't access the server
quescing channel and, instead, relies on the coordinator to tell it to
exit (either by closing the channel when quescing or sending a single
message when the target size is decreased).

Release note: None
  • Loading branch information
yuzefovich committed Jul 27, 2022
1 parent 658bf2b commit e77f377
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 29 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ type DistSQLPlanner struct {
distSQLSrv *distsql.ServerImpl
spanResolver physicalplan.SpanResolver

// runnerChan is used to send out requests (for running SetupFlow RPCs) to a
// pool of workers.
runnerChan chan runnerRequest
// runnerCoordinator is used to send out requests (for running SetupFlow
// RPCs) to a pool of workers.
runnerCoordinator runnerCoordinator

// cancelFlowsCoordinator is responsible for batching up the requests to
// cancel remote flows initiated on the behalf of the current node when the
Expand Down Expand Up @@ -211,7 +211,7 @@ func NewDistSQLPlanner(
rpcCtx.Stopper.AddCloser(dsp.parallelLocalScansSem.Closer("stopper"))
}

dsp.initRunners(ctx)
dsp.runnerCoordinator.init(ctx, stopper, &st.SV)
dsp.initCancelingWorkers(ctx)
return dsp
}
Expand Down
144 changes: 119 additions & 25 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"
"math"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/colflow"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/contentionpb"
Expand All @@ -50,18 +52,36 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/ring"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)

// To allow queries to send out flow RPCs in parallel, we use a pool of workers
// that can issue the RPCs on behalf of the running code. The pool is shared by
// multiple queries.
const numRunners = 16
var settingDistSQLNumRunners = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.distsql.num_runners",
"determines the number of DistSQL runner goroutines used for issuing SetupFlow RPCs",
// We use GOMAXPROCS instead of NumCPU because the former could be adjusted
// based on cgroup limits (see cgroups.AdjustMaxProcs).
//
// The choice of the default multiple of 4 was made in order to get the
// original value of 16 on machines with 4 CPUs.
4*int64(runtime.GOMAXPROCS(0)), /* defaultValue */
func(v int64) error {
if v < 0 {
return errors.Errorf("cannot be set to a negative value: %d", v)
}
if v > distSQLNumRunnersMax {
return errors.Errorf("cannot be set to a value exceeding %d: %d", distSQLNumRunnersMax, v)
}
return nil
},
)

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"
// Somewhat arbitrary upper bound.
var distSQLNumRunnersMax = 256 * int64(runtime.GOMAXPROCS(0))

// runnerRequest is the request that is sent (via a channel) to a worker.
type runnerRequest struct {
Expand Down Expand Up @@ -101,32 +121,104 @@ func (req runnerRequest) run() {
req.resultChan <- res
}

func (dsp *DistSQLPlanner) initRunners(ctx context.Context) {
type runnerCoordinator struct {
// runnerChan is used by the DistSQLPlanner to send out requests (for
// running SetupFlow RPCs) to a pool of workers.
runnerChan chan runnerRequest
// newDesiredNumWorkers is used to notify the coordinator that the size of
// the pool of workers might have changed.
newDesiredNumWorkers chan int64
atomics struct {
// numWorkers tracks the number of workers running at the moment. This
// needs to be accessed atomically, but only because of the usage in
// tests.
numWorkers int64
}
}

func (c *runnerCoordinator) init(ctx context.Context, stopper *stop.Stopper, sv *settings.Values) {
// This channel has to be unbuffered because we want to only be able to send
// requests if a worker is actually there to receive them.
dsp.runnerChan = make(chan runnerRequest)
for i := 0; i < numRunners; i++ {
_ = dsp.stopper.RunAsyncTask(ctx, "distsql-runner", func(context.Context) {
runnerChan := dsp.runnerChan
stopChan := dsp.stopper.ShouldQuiesce()
for {
select {
case req := <-runnerChan:
req.run()

case <-stopChan:
return
}
c.runnerChan = make(chan runnerRequest)
stopWorkerChan := make(chan struct{})
worker := func(context.Context) {
for {
select {
case req := <-c.runnerChan:
req.run()
case <-stopWorkerChan:
return
}
})
}
}
stopChan := stopper.ShouldQuiesce()
// This is a buffered channel because we will be sending on it from the
// callback when the corresponding setting changes. The buffer size of 1
// should be sufficient, but we use a larger buffer out of caution (in case
// the cluster setting is updated rapidly) - in order to not block the
// goroutine that is updating the settings.
c.newDesiredNumWorkers = make(chan int64, 4)
// setNewNumWorkers sets the new target size of the pool of workers.
setNewNumWorkers := func(newNumWorkers int64) {
select {
case c.newDesiredNumWorkers <- newNumWorkers:
case <-stopChan:
// If the server is quescing, then the new size of the pool doesn't
// matter.
return
}
}
// Whenever the corresponding setting is updated, we need to notify the
// coordinator.
// NB: runnerCoordinator.init is called once per server lifetime so this
// won't leak an unbounded number of OnChange callbacks.
settingDistSQLNumRunners.SetOnChange(sv, func(ctx context.Context) {
setNewNumWorkers(settingDistSQLNumRunners.Get(sv))
})
// We need to set the target pool size based on the current setting
// explicitly since the OnChange callback won't ever be called for the
// initial value - the setting initialization has already been performed
// before we registered the OnChange callback.
setNewNumWorkers(settingDistSQLNumRunners.Get(sv))
// Spin up the coordinator goroutine.
_ = stopper.RunAsyncTask(ctx, "distsql-runner-coordinator", func(context.Context) {
// Make sure to stop all workers when the coordinator exits.
defer close(stopWorkerChan)
for {
select {
case newNumWorkers := <-c.newDesiredNumWorkers:
for {
numWorkers := atomic.LoadInt64(&c.atomics.numWorkers)
if numWorkers == newNumWorkers {
break
}
if numWorkers < newNumWorkers {
// Need to spin another worker.
err := stopper.RunAsyncTask(ctx, "distsql-runner", worker)
if err != nil {
return
}
atomic.AddInt64(&c.atomics.numWorkers, 1)
} else {
// Need to stop one of the workers.
select {
case stopWorkerChan <- struct{}{}:
atomic.AddInt64(&c.atomics.numWorkers, -1)
case <-stopChan:
return
}
}
}
case <-stopChan:
return
}
}
})
}

// To allow for canceling flows via CancelDeadFlows RPC on different nodes
// simultaneously, we use a pool of workers. It is likely that these workers
// will be less busy than SetupFlow runners, so we instantiate smaller number of
// the canceling workers.
const numCancelingWorkers = numRunners / 4
// simultaneously, we use a pool of workers.
const numCancelingWorkers = 4

func (dsp *DistSQLPlanner) initCancelingWorkers(initCtx context.Context) {
dsp.cancelFlowsCoordinator.workerWait = make(chan struct{}, numCancelingWorkers)
Expand Down Expand Up @@ -341,7 +433,7 @@ func (dsp *DistSQLPlanner) setupFlows(
// Send out a request to the workers; if no worker is available, run
// directly.
select {
case dsp.runnerChan <- runReq:
case dsp.runnerCoordinator.runnerChan <- runReq:
default:
runReq.run()
}
Expand Down Expand Up @@ -373,6 +465,8 @@ func (dsp *DistSQLPlanner) setupFlows(
return dsp.distSQLSrv.SetupLocalSyncFlow(ctx, evalCtx.Mon, &setupReq, recv, batchReceiver, localState)
}

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"

// Run executes a physical plan. The plan should have been finalized using
// FinalizePlan.
//
Expand Down
31 changes: 31 additions & 0 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,3 +676,34 @@ func TestDistSQLReceiverCancelsDeadFlows(t *testing.T) {
return nil
})
}

// TestDistSQLRunnerCoordinator verifies that the runnerCoordinator correctly
// reacts to the changes of the corresponding setting.
func TestDistSQLRunnerCoordinator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

runner := &s.ExecutorConfig().(ExecutorConfig).DistSQLPlanner.runnerCoordinator
sqlDB := sqlutils.MakeSQLRunner(db)

checkNumRunners := func(newNumRunners int64) {
sqlDB.Exec(t, fmt.Sprintf("SET CLUSTER SETTING sql.distsql.num_runners = %d", newNumRunners))
testutils.SucceedsSoon(t, func() error {
numWorkers := atomic.LoadInt64(&runner.atomics.numWorkers)
if numWorkers != newNumRunners {
return errors.Newf("%d workers are up, want %d", numWorkers, newNumRunners)
}
return nil
})
}

// Lower the setting to 0 and make sure that all runners exit.
checkNumRunners(0)

// Now bump it up to 100.
checkNumRunners(100)
}

0 comments on commit e77f377

Please sign in to comment.