Skip to content

Commit

Permalink
server: add server-wide limit on addsstable send concurrency
Browse files Browse the repository at this point in the history
While we generally prefer recipient-side limiting to better utilize all
available nodes' capacity, this adds an extra layer of limiter just in
case we find ourselves needing it. It defaults to unlimited.

Release note (sql change): The cluster setting bulkio.ingest.sender_concurrency_limit
can be used to adjust the concurrency at which any one SQL node, across all operations
that it is running such as RESTORES, IMPORTs, and schema changes, will send bulk ingest
requests to the KV storage layer.
  • Loading branch information
dt committed Apr 25, 2022
1 parent 0059cd6 commit 6e7b085
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 45 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io/ioutil"
math "math"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -42,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -240,9 +242,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &tree.EvalContext{
Codec: keys.SystemSQLCodec,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -52,9 +54,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: kvDB,
JobRegistry: registry,
Settings: st,
DB: kvDB,
JobRegistry: registry,
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount())
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings,
sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter)
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"fmt"
"math"
"net/url"
"strconv"
"sync"
Expand Down Expand Up @@ -40,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -551,10 +553,11 @@ func getStreamIngestionProcessor(

flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: kvDB,
JobRegistry: registry,
TestingKnobs: execinfra.TestingKnobs{StreamingTestingKnobs: streamingTestingKnobs},
Settings: st,
DB: kvDB,
JobRegistry: registry,
TestingKnobs: execinfra.TestingKnobs{StreamingTestingKnobs: streamingTestingKnobs},
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down Expand Up @@ -62,6 +63,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/tracing",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -78,6 +79,7 @@ func MakeBulkAdder(
timestamp hlc.Timestamp,
opts kvserverbase.BulkAdderOptions,
bulkMon *mon.BytesMonitor,
sendLimiter limit.ConcurrentRequestLimiter,
) (*BufferingAdder, error) {
if opts.MinBufferSize == 0 {
opts.MinBufferSize = 32 << 20
Expand All @@ -98,6 +100,7 @@ func MakeBulkAdder(
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
mem: bulkMon.MakeBoundAccount(),
limiter: sendLimiter,
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand Down
104 changes: 74 additions & 30 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package bulk
import (
"bytes"
"context"
"math"
"sync/atomic"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -55,8 +57,35 @@ var (
0,
settings.NonNegativeDuration,
)

senderConcurrency = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.ingest.sender_concurrency_limit",
"maximum number of concurrent bulk ingest requests sent by any one sender, such as a processor in an IMPORT, index creation or RESTORE, etc (0 = no limit)",
0,
settings.NonNegativeInt,
)
)

// MakeAndRegisterConcurrencyLimiter makes a concurrency limiter and registers it
// with the setting on-change hook; it should be called only once during server
// setup due to the side-effects of the on-change registration.
func MakeAndRegisterConcurrencyLimiter(sv *settings.Values) limit.ConcurrentRequestLimiter {
newLimit := int(senderConcurrency.Get(sv))
if newLimit == 0 {
newLimit = math.MaxInt
}
l := limit.MakeConcurrentRequestLimiter("bulk-send-limit", newLimit)
senderConcurrency.SetOnChange(sv, func(ctx context.Context) {
newLimit := int(senderConcurrency.Get(sv))
if newLimit == 0 {
newLimit = math.MaxInt
}
l.SetLimit(newLimit)
})
return l
}

// SSTBatcher is a helper for bulk-adding many KVs in chunks via AddSSTable. An
// SSTBatcher can be handed KVs repeatedly and will make them into SSTs that are
// added when they reach the configured size, tracking the total added rows,
Expand All @@ -69,6 +98,7 @@ type SSTBatcher struct {
rc *rangecache.RangeCache
settings *cluster.Settings
mem mon.BoundAccount
limiter limit.ConcurrentRequestLimiter

// disallowShadowingBelow is described on roachpb.AddSSTableRequest.
disallowShadowingBelow hlc.Timestamp
Expand Down Expand Up @@ -147,6 +177,7 @@ func MakeSSTBatcher(
writeAtBatchTs bool,
splitFilledRanges bool,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{
name: name,
Expand All @@ -156,6 +187,7 @@ func MakeSSTBatcher(
writeAtBatchTS: writeAtBatchTs,
disableSplits: !splitFilledRanges,
mem: mem,
limiter: sendLimiter,
}
err := b.Reset(ctx)
return b, err
Expand All @@ -164,9 +196,13 @@ func MakeSSTBatcher(
// MakeStreamSSTBatcher creates a batcher configured to ingest duplicate keys
// that might be received from a cluster to cluster stream.
func MakeStreamSSTBatcher(
ctx context.Context, db *kv.DB, settings *cluster.Settings, mem mon.BoundAccount,
ctx context.Context,
db *kv.DB,
settings *cluster.Settings,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem}
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter}
err := b.Reset(ctx)
return b, err
}
Expand Down Expand Up @@ -473,12 +509,42 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
summary := b.rowCounter.BulkOpSummary
data := b.sstFile.Data()
batchTS := b.batchTS

res, err := b.limiter.Begin(ctx)
if err != nil {
return err
}

// If we're flushing due to a range boundary, we we might be flushing this
// one buffer into many different ranges, and doing so one-by-one, waiting
// for each round-trip serially, could really add up; a buffer of random
// data that covers all of a 2000 range table would be flushing 2000 SSTs,
// each of which might be quite small, like 256kib, but still see, say, 50ms
// or more round-trip time. Doing those serially would then take minutes. If
// we can, instead send this SST and move on to the next while it is sent,
// we could reduce that considerably. One concern with doing so however is
// that you could potentially end up with an entire buffer's worth of SSTs
// all inflight at once, effectively doubling the memory footprint, so we
// need to reserve memory from a monitor for the sst before we move on to
// the next one; if memory is not available we'll just block on the send
// and then move on to the next send after this SST is no longer being held
// in memory.
flushAsync := reason == rangeFlush

var reserved int64
if flushAsync {
if err := b.mem.Grow(ctx, int64(cap(data))); err != nil {
log.VEventf(ctx, 3, "%s unable to reserve enough memory to flush async: %v", b.name, err)
flushAsync = false
} else {
reserved = int64(cap(data))
}
}

updatesLastRange := reason != rangeFlush
fn := func(ctx context.Context) error {
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, updatesLastRange); err != nil {
b.mem.Shrink(ctx, reserved)
defer res.Release()
defer b.mem.Shrink(ctx, reserved)
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, !flushAsync); err != nil {
return err
}
b.mu.Lock()
Expand All @@ -487,37 +553,15 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
atomic.AddInt64(&b.stats.batchWaitAtomic, int64(timeutil.Since(beforeFlush)))
atomic.AddInt64(&b.stats.dataSizeAtomic, int64(size))
b.mu.Unlock()
if reserved != 0 {
b.mem.Shrink(ctx, reserved)
}
return nil
}

if reason == rangeFlush {
// If we're flushing due to a range boundary, we we might be flushing this
// one buffer into many different ranges, and doing so one-by-one, waiting
// for each round-trip serially, could really add up; a buffer of random
// data that covers all of a 2000 range table would be flushing 2000 SSTs,
// each of which might be quite small, like 256kib, but still see, say, 50ms
// or more round-trip time. Doing those serially would then take minutes. If
// we can, instead send this SST and move on to the next while it is sent,
// we could reduce that considerably. One concern with doing so however is
// that you could potentially end up with an entire buffer's worth of SSTs
// all inflight at once, effectively doubling the memory footprint, so we
// need to reserve memory from a monitor for the sst before we move on to
// the next one; if memory is not available we'll just block on the send
// and then move on to the next send after this SST is no longer being held
// in memory.
if flushAsync {
if b.asyncAddSSTs == (ctxgroup.Group{}) {
b.asyncAddSSTs = ctxgroup.WithContext(ctx)
}
if err := b.mem.Grow(ctx, int64(cap(data))); err != nil {
log.VEventf(ctx, 3, "%s unable to reserve enough memory to flush async: %v", b.name, err)
} else {
reserved = int64(cap(data))
b.asyncAddSSTs.GoCtx(fn)
return nil
}
b.asyncAddSSTs.GoCtx(fn)
return nil
}

return fn(ctx)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand Down Expand Up @@ -133,9 +134,10 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mockCache.Insert(ctx, r)

ts := hlc.Timestamp{WallTime: 100}
lots := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, lots,
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
)
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

clusterIDForSQL := cfg.rpcContext.LogicalClusterID

bulkSenderLimiter := bulk.MakeAndRegisterConcurrencyLimiter(&cfg.Settings.SV)

// Set up the DistSQL server.
distSQLCfg := execinfra.ServerConfig{
AmbientContext: cfg.AmbientCtx,
Expand All @@ -616,6 +618,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
BackupMonitor: backupMemoryMonitor,
BulkSenderLimiter: bulkSenderLimiter,

ParentMemoryMonitor: rootSQLMemoryMonitor,
BulkAdder: func(
Expand All @@ -624,7 +627,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// Attach a child memory monitor to enable control over the BulkAdder's
// memory usage.
bulkMon := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "bulk-adder-monitor")
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon)
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon, bulkSenderLimiter)
},

Metrics: &distSQLMetrics,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
Expand Down
Loading

0 comments on commit 6e7b085

Please sign in to comment.