Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
80494: server: add server-wide limit on addsstable send concurrency r=dt a=dt

While we generally prefer recipient-side limiting to better utilize all
available nodes' capacity, this adds an extra layer of limiter just in
case we find ourselves needing it. It defaults to unlimited.

Release note (sql change): The cluster setting bulkio.ingest.sender_concurrency_limit
can be used to adjust the concurrency at which any one SQL node, across all operations
that it is running such as RESTORES, IMPORTs, and schema changes, will send bulk ingest
requests to the KV storage layer.

Epic CRDB-2264

81177: ccl/sqlproxyccl: invoke rebalancing logic during RUNNING pod events r=JeffSwenson a=jaylim-crl

#### ccl/sqlproxyccl: invoke rebalancing logic during RUNNING pod events 

This commit invokes the rebalancing logic during RUNNING pod events as part of
the pod watcher. Since the rebalancing logic depends on the tenant directory,
the pod watcher will now only emit events once the directory has been updated.
This is done for better responsiveness, i.e. the moment a new SQL pod gets
added, we would like to rebalance all connections to the tenant.

Note that the Watch endpoint on the tenant directory server currently emits
events in multiple cases: changes to load, and changes to pod (added/modified/
deleted). The plan is to update the tenant directory server to only emit events
for pod updates. The next commit will rate limit the number of times the
rebalancing logic for a given tenant can be called.

At the same time, we introduce a new test static directory server which does
not automatically spin up tenants for us (i.e. SQL pods for tenants can now
be managed manually, giving more control to tests).

#### ccl/sqlproxyccl: rate limit the number of rebalances per tenant 

This commit rate limits the number of rebalances per tenant to once every
15 seconds (i.e. 1/2 of the rebalance loop interval). The main purpose of
this is to prevent a burst of pod events for the same tenant causing multiple
rebalances, which may move a lot of connections around.

Release note: None

81582: sql: add is_grantable column to SHOW GRANTS FOR role r=richardjcai a=ecwall

refs #73394

Release note (sql change): Add is_grantable column to
SHOW GRANTS FOR role to be consistent with other SHOW GRANTS
commands.

81776: bazel: bump size of `rangefeed` test r=rail a=rickystewart

This has timed out in CI.

Release note: None

Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Jay <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
5 people committed May 24, 2022
5 parents 2138d80 + 6e7b085 + 62021aa + 7e1253f + ba476f2 commit e2c163b
Show file tree
Hide file tree
Showing 36 changed files with 2,828 additions and 1,979 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io/ioutil"
math "math"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -42,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -247,9 +249,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil, opts...)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &eval.Context{
Codec: keys.SystemSQLCodec,
Expand Down
171 changes: 117 additions & 54 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
// DRAINING state before the proxy starts moving connections away from it.
minDrainPeriod = 1 * time.Minute

// defaultRebalanceDelay is the minimum amount of time that must elapse
// between rebalance operations. This was deliberately chosen to be half of
// rebalanceInterval, and is mainly used to rate limit effects due to events
// from the pod watcher.
defaultRebalanceDelay = 15 * time.Second

// rebalancePercentDeviation defines the percentage threshold that the
// current number of assignments can deviate away from the mean. Having a
// 15% "deadzone" reduces frequent transfers especially when load is
Expand All @@ -50,15 +56,15 @@ const (
// NOTE: This must be between 0 and 1 inclusive.
rebalancePercentDeviation = 0.15

// rebalanceRate defines the rate of rebalancing assignments across SQL
// pods. This rate applies to both RUNNING and DRAINING pods. For example,
// consider the case where the rate is 0.50; if we have decided that we need
// to move 15 assignments away from a particular pod, only 7 pods will be
// moved at a time.
// defaultRebalanceRate defines the rate of rebalancing assignments across
// SQL pods. This rate applies to both RUNNING and DRAINING pods. For
// example, consider the case where the rate is 0.50; if we have decided
// that we need to move 15 assignments away from a particular pod, only 7
// pods will be moved at a time.
//
// NOTE: This must be between 0 and 1 inclusive. 0 means no rebalancing
// will occur.
rebalanceRate = 0.50
defaultRebalanceRate = 0.50

// defaultMaxConcurrentRebalances represents the maximum number of
// concurrent rebalance requests that are being processed. This effectively
Expand All @@ -78,6 +84,8 @@ type balancerOptions struct {
maxConcurrentRebalances int
noRebalanceLoop bool
timeSource timeutil.TimeSource
rebalanceRate float32
rebalanceDelay time.Duration
}

// Option defines an option that can be passed to NewBalancer in order to
Expand Down Expand Up @@ -109,6 +117,25 @@ func TimeSource(ts timeutil.TimeSource) Option {
}
}

// RebalanceRate defines the rate of rebalancing across pods. This must be
// between 0 and 1 inclusive. 0 means no rebalancing will occur.
func RebalanceRate(rate float32) Option {
return func(opts *balancerOptions) {
opts.rebalanceRate = rate
}
}

// RebalanceDelay specifies the minimum amount of time that must elapse between
// attempts to rebalance a given tenant. This delay has the effect of throttling
// RebalanceTenant calls to avoid constantly moving connections around.
//
// RebalanceDelay defaults to defaultRebalanceDelay. Use -1 to never throttle.
func RebalanceDelay(delay time.Duration) Option {
return func(opts *balancerOptions) {
opts.rebalanceDelay = delay
}
}

// Balancer handles load balancing of SQL connections within the proxy.
// All methods on the Balancer instance are thread-safe.
type Balancer struct {
Expand Down Expand Up @@ -140,6 +167,22 @@ type Balancer struct {
// timeutil.DefaultTimeSource. Override with the TimeSource() option when
// calling NewBalancer.
timeSource timeutil.TimeSource

// rebalanceRate represents the rate of rebalancing connections.
rebalanceRate float32

// rebalanceDelay is the minimum amount of time that must elapse between
// attempts to rebalance a given tenant. Defaults to defaultRebalanceDelay.
rebalanceDelay time.Duration

// lastRebalance is the last time the tenants are rebalanced. This is used
// to rate limit the number of rebalances per tenant. Synchronization is
// needed since rebalance operations can be triggered by the rebalance loop,
// or the pod watcher.
lastRebalance struct {
syncutil.Mutex
tenants map[roachpb.TenantID]time.Time
}
}

// NewBalancer constructs a new Balancer instance that is responsible for
Expand All @@ -152,16 +195,15 @@ func NewBalancer(
opts ...Option,
) (*Balancer, error) {
// Handle options.
options := &balancerOptions{}
options := &balancerOptions{
maxConcurrentRebalances: defaultMaxConcurrentRebalances,
timeSource: timeutil.DefaultTimeSource{},
rebalanceRate: defaultRebalanceRate,
rebalanceDelay: defaultRebalanceDelay,
}
for _, opt := range opts {
opt(options)
}
if options.maxConcurrentRebalances == 0 {
options.maxConcurrentRebalances = defaultMaxConcurrentRebalances
}
if options.timeSource == nil {
options.timeSource = timeutil.DefaultTimeSource{}
}

// Ensure that ctx gets cancelled on stopper's quiescing.
ctx, _ = stopper.WithCancelOnQuiesce(ctx)
Expand All @@ -178,7 +220,11 @@ func NewBalancer(
queue: q,
processSem: semaphore.New(options.maxConcurrentRebalances),
timeSource: options.timeSource,
rebalanceRate: options.rebalanceRate,
rebalanceDelay: options.rebalanceDelay,
}
b.lastRebalance.tenants = make(map[roachpb.TenantID]time.Time)

b.connTracker, err = NewConnTracker(ctx, b.stopper, b.timeSource)
if err != nil {
return nil, err
Expand All @@ -199,6 +245,46 @@ func NewBalancer(
return b, nil
}

// RebalanceTenant rebalances connections to the given tenant. If no RUNNING
// pod exists for the given tenant, or the tenant has been recently rebalanced,
// this is a no-op.
func (b *Balancer) RebalanceTenant(ctx context.Context, tenantID roachpb.TenantID) {
// If rebalanced recently, no-op.
if !b.canRebalanceTenant(tenantID) {
return
}

tenantPods, err := b.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
log.Errorf(ctx, "could not rebalance tenant %s: %v", tenantID, err.Error())
return
}

// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

if pod.State == tenant.RUNNING {
hasRunningPod = true
}
}

// Only attempt to rebalance if we have a RUNNING pod. In theory, this
// case would happen if we're scaling down from 1 to 0, which in that
// case, we can't transfer connections anywhere. Practically, we will
// never scale a tenant from 1 to 0 if there are still active
// connections, so this case should not occur.
if !hasRunningPod {
return
}

activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
}

// SelectTenantPod selects a tenant pod from the given list based on a weighted
// CPU load algorithm. It is expected that all pods within the list belongs to
// the same tenant. If no pods are available, this returns ErrNoAvailablePods.
Expand Down Expand Up @@ -299,58 +385,35 @@ func (b *Balancer) rebalanceLoop(ctx context.Context) {
}
}

// canRebalanceTenant returns true if it has been at least `rebalanceDelay`
// since the last time the given tenant was rebalanced, or false otherwise.
func (b *Balancer) canRebalanceTenant(tenantID roachpb.TenantID) bool {
b.lastRebalance.Lock()
defer b.lastRebalance.Unlock()

now := b.timeSource.Now()
if now.Sub(b.lastRebalance.tenants[tenantID]) < b.rebalanceDelay {
return false
}
b.lastRebalance.tenants[tenantID] = now
return true
}

// rebalance attempts to rebalance connections for all tenants within the proxy.
//
// TODO(jaylim-crl): Update this to support rebalancing a single tenant. That
// way, the pod watcher could call this to rebalance a single tenant. We may
// also want to rate limit the number of rebalances per tenant for requests
// coming from the pod watcher.
func (b *Balancer) rebalance(ctx context.Context) {
// getTenantIDs ensures that tenants will have at least one connection.
tenantIDs := b.connTracker.getTenantIDs()

for _, tenantID := range tenantIDs {
tenantPods, err := b.directoryCache.TryLookupTenantPods(ctx, tenantID)
if err != nil {
// This case shouldn't really occur unless there's a bug in the
// directory server (e.g. deleted pod events, but the pod is still
// alive).
log.Errorf(ctx, "could not lookup pods for tenant %s: %v", tenantID, err.Error())
continue
}

// Construct a map so we could easily retrieve the pod by address.
podMap := make(map[string]*tenant.Pod)
var hasRunningPod bool
for _, pod := range tenantPods {
podMap[pod.Addr] = pod

if pod.State == tenant.RUNNING {
hasRunningPod = true
}
}

// Only attempt to rebalance if we have a RUNNING pod. In theory, this
// case would happen if we're scaling down from 1 to 0, which in that
// case, we can't transfer connections anywhere. Practically, we will
// never scale a tenant from 1 to 0 if there are still active
// connections, so this case should not occur.
if !hasRunningPod {
continue
}

activeList, idleList := b.connTracker.listAssignments(tenantID)
b.rebalancePartition(podMap, activeList)
b.rebalancePartition(podMap, idleList)
b.RebalanceTenant(ctx, tenantID)
}
}

// rebalancePartition rebalances the given assignments partition.
func (b *Balancer) rebalancePartition(
pods map[string]*tenant.Pod, assignments []*ServerAssignment,
) {
// Nothing to do here.
if len(pods) == 0 || len(assignments) == 0 {
// Nothing to do here if there are no assignments, or only one pod.
if len(pods) <= 1 || len(assignments) == 0 {
return
}

Expand All @@ -371,7 +434,7 @@ func (b *Balancer) rebalancePartition(
//
// NOTE: Elements in the list may be shuffled around once this method returns.
func (b *Balancer) enqueueRebalanceRequests(list []*ServerAssignment) {
toMoveCount := int(math.Ceil(float64(len(list)) * float64(rebalanceRate)))
toMoveCount := int(math.Ceil(float64(len(list)) * float64(b.rebalanceRate)))
partition, _ := partitionNRandom(list, toMoveCount)
for _, a := range partition {
b.queue.enqueue(&rebalanceRequest{
Expand Down
Loading

0 comments on commit e2c163b

Please sign in to comment.