Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: add server-wide limit on addsstable send concurrency #80494

Merged
merged 1 commit into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ go_test(
"//pkg/util/hlc",
"//pkg/util/ioctx",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
writeAtBatchTS,
false, /* splitFilledRanges */
rd.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(),
rd.flowCtx.Cfg.BulkSenderLimiter,
)
if err != nil {
return summary, err
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"io/ioutil"
math "math"
"os"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -42,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -240,9 +242,10 @@ func runTestIngest(t *testing.T, init func(*cluster.Settings)) {
return cloud.MakeExternalStorage(ctx, dest, base.ExternalIODirConfig{},
s.ClusterSettings(), blobs.TestBlobServiceClient(s.ClusterSettings().ExternalIODir), nil, nil, nil)
},
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
Settings: s.ClusterSettings(),
Codec: keys.SystemSQLCodec,
BackupMonitor: mon.NewUnlimitedMonitor(ctx, "test", mon.MemoryResource, nil, nil, 0, s.ClusterSettings()),
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &tree.EvalContext{
Codec: keys.SystemSQLCodec,
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/stretchr/testify/require"
)
Expand All @@ -52,9 +54,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
registry := tc.Server(0).JobRegistry().(*jobs.Registry)
flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: kvDB,
JobRegistry: registry,
Settings: st,
DB: kvDB,
JobRegistry: registry,
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) {
evalCtx := sip.FlowCtx.EvalCtx
db := sip.FlowCtx.Cfg.DB
var err error
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings, sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount())
sip.batcher, err = bulk.MakeStreamSSTBatcher(ctx, db, evalCtx.Settings,
sip.flowCtx.Cfg.BackupMonitor.MakeBoundAccount(), sip.flowCtx.Cfg.BulkSenderLimiter)
if err != nil {
sip.MoveToDraining(errors.Wrap(err, "creating stream sst batcher"))
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package streamingest
import (
"context"
"fmt"
"math"
"net/url"
"strconv"
"sync"
Expand Down Expand Up @@ -40,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -551,10 +553,11 @@ func getStreamIngestionProcessor(

flowCtx := execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{
Settings: st,
DB: kvDB,
JobRegistry: registry,
TestingKnobs: execinfra.TestingKnobs{StreamingTestingKnobs: streamingTestingKnobs},
Settings: st,
DB: kvDB,
JobRegistry: registry,
TestingKnobs: execinfra.TestingKnobs{StreamingTestingKnobs: streamingTestingKnobs},
BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt),
},
EvalCtx: &evalCtx,
DiskMonitor: testDiskMonitor,
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
Expand Down Expand Up @@ -62,6 +63,7 @@ go_test(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/limit",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/tracing",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -78,6 +79,7 @@ func MakeBulkAdder(
timestamp hlc.Timestamp,
opts kvserverbase.BulkAdderOptions,
bulkMon *mon.BytesMonitor,
sendLimiter limit.ConcurrentRequestLimiter,
) (*BufferingAdder, error) {
if opts.MinBufferSize == 0 {
opts.MinBufferSize = 32 << 20
Expand All @@ -98,6 +100,7 @@ func MakeBulkAdder(
batchTS: opts.BatchTimestamp,
writeAtBatchTS: opts.WriteAtBatchTimestamp,
mem: bulkMon.MakeBoundAccount(),
limiter: sendLimiter,
},
timestamp: timestamp,
maxBufferLimit: opts.MaxBufferSize,
Expand Down
104 changes: 74 additions & 30 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package bulk
import (
"bytes"
"context"
"math"
"sync/atomic"
"time"

Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -55,8 +57,35 @@ var (
0,
settings.NonNegativeDuration,
)

senderConcurrency = settings.RegisterIntSetting(
settings.TenantWritable,
"bulkio.ingest.sender_concurrency_limit",
"maximum number of concurrent bulk ingest requests sent by any one sender, such as a processor in an IMPORT, index creation or RESTORE, etc (0 = no limit)",
0,
settings.NonNegativeInt,
)
)

// MakeAndRegisterConcurrencyLimiter makes a concurrency limiter and registers it
// with the setting on-change hook; it should be called only once during server
// setup due to the side-effects of the on-change registration.
func MakeAndRegisterConcurrencyLimiter(sv *settings.Values) limit.ConcurrentRequestLimiter {
newLimit := int(senderConcurrency.Get(sv))
if newLimit == 0 {
newLimit = math.MaxInt
}
l := limit.MakeConcurrentRequestLimiter("bulk-send-limit", newLimit)
senderConcurrency.SetOnChange(sv, func(ctx context.Context) {
newLimit := int(senderConcurrency.Get(sv))
if newLimit == 0 {
newLimit = math.MaxInt
}
l.SetLimit(newLimit)
})
return l
}

// SSTBatcher is a helper for bulk-adding many KVs in chunks via AddSSTable. An
// SSTBatcher can be handed KVs repeatedly and will make them into SSTs that are
// added when they reach the configured size, tracking the total added rows,
Expand All @@ -69,6 +98,7 @@ type SSTBatcher struct {
rc *rangecache.RangeCache
settings *cluster.Settings
mem mon.BoundAccount
limiter limit.ConcurrentRequestLimiter

// disallowShadowingBelow is described on roachpb.AddSSTableRequest.
disallowShadowingBelow hlc.Timestamp
Expand Down Expand Up @@ -147,6 +177,7 @@ func MakeSSTBatcher(
writeAtBatchTs bool,
splitFilledRanges bool,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{
name: name,
Expand All @@ -156,6 +187,7 @@ func MakeSSTBatcher(
writeAtBatchTS: writeAtBatchTs,
disableSplits: !splitFilledRanges,
mem: mem,
limiter: sendLimiter,
}
err := b.Reset(ctx)
return b, err
Expand All @@ -164,9 +196,13 @@ func MakeSSTBatcher(
// MakeStreamSSTBatcher creates a batcher configured to ingest duplicate keys
// that might be received from a cluster to cluster stream.
func MakeStreamSSTBatcher(
ctx context.Context, db *kv.DB, settings *cluster.Settings, mem mon.BoundAccount,
ctx context.Context,
db *kv.DB,
settings *cluster.Settings,
mem mon.BoundAccount,
sendLimiter limit.ConcurrentRequestLimiter,
) (*SSTBatcher, error) {
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem}
b := &SSTBatcher{db: db, settings: settings, ingestAll: true, mem: mem, limiter: sendLimiter}
err := b.Reset(ctx)
return b, err
}
Expand Down Expand Up @@ -473,12 +509,42 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
summary := b.rowCounter.BulkOpSummary
data := b.sstFile.Data()
batchTS := b.batchTS

res, err := b.limiter.Begin(ctx)
if err != nil {
return err
}

// If we're flushing due to a range boundary, we we might be flushing this
// one buffer into many different ranges, and doing so one-by-one, waiting
// for each round-trip serially, could really add up; a buffer of random
// data that covers all of a 2000 range table would be flushing 2000 SSTs,
// each of which might be quite small, like 256kib, but still see, say, 50ms
// or more round-trip time. Doing those serially would then take minutes. If
// we can, instead send this SST and move on to the next while it is sent,
// we could reduce that considerably. One concern with doing so however is
// that you could potentially end up with an entire buffer's worth of SSTs
// all inflight at once, effectively doubling the memory footprint, so we
// need to reserve memory from a monitor for the sst before we move on to
// the next one; if memory is not available we'll just block on the send
// and then move on to the next send after this SST is no longer being held
// in memory.
flushAsync := reason == rangeFlush

var reserved int64
if flushAsync {
if err := b.mem.Grow(ctx, int64(cap(data))); err != nil {
log.VEventf(ctx, 3, "%s unable to reserve enough memory to flush async: %v", b.name, err)
flushAsync = false
} else {
reserved = int64(cap(data))
}
}

updatesLastRange := reason != rangeFlush
fn := func(ctx context.Context) error {
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, updatesLastRange); err != nil {
b.mem.Shrink(ctx, reserved)
defer res.Release()
defer b.mem.Shrink(ctx, reserved)
if err := b.addSSTable(ctx, batchTS, start, end, data, stats, !flushAsync); err != nil {
return err
}
b.mu.Lock()
Expand All @@ -487,37 +553,15 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int) error {
atomic.AddInt64(&b.stats.batchWaitAtomic, int64(timeutil.Since(beforeFlush)))
atomic.AddInt64(&b.stats.dataSizeAtomic, int64(size))
b.mu.Unlock()
if reserved != 0 {
b.mem.Shrink(ctx, reserved)
}
return nil
}

if reason == rangeFlush {
// If we're flushing due to a range boundary, we we might be flushing this
// one buffer into many different ranges, and doing so one-by-one, waiting
// for each round-trip serially, could really add up; a buffer of random
// data that covers all of a 2000 range table would be flushing 2000 SSTs,
// each of which might be quite small, like 256kib, but still see, say, 50ms
// or more round-trip time. Doing those serially would then take minutes. If
// we can, instead send this SST and move on to the next while it is sent,
// we could reduce that considerably. One concern with doing so however is
// that you could potentially end up with an entire buffer's worth of SSTs
// all inflight at once, effectively doubling the memory footprint, so we
// need to reserve memory from a monitor for the sst before we move on to
// the next one; if memory is not available we'll just block on the send
// and then move on to the next send after this SST is no longer being held
// in memory.
if flushAsync {
if b.asyncAddSSTs == (ctxgroup.Group{}) {
b.asyncAddSSTs = ctxgroup.WithContext(ctx)
}
if err := b.mem.Grow(ctx, int64(cap(data))); err != nil {
log.VEventf(ctx, 3, "%s unable to reserve enough memory to flush async: %v", b.name, err)
} else {
reserved = int64(cap(data))
b.asyncAddSSTs.GoCtx(fn)
return nil
}
b.asyncAddSSTs.GoCtx(fn)
return nil
}

return fn(ctx)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)
Expand Down Expand Up @@ -133,9 +134,10 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mockCache.Insert(ctx, r)

ts := hlc.Timestamp{WallTime: 100}
lots := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, lots,
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
)
if err != nil {
t.Fatal(err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

clusterIDForSQL := cfg.rpcContext.LogicalClusterID

bulkSenderLimiter := bulk.MakeAndRegisterConcurrencyLimiter(&cfg.Settings.SV)

// Set up the DistSQL server.
distSQLCfg := execinfra.ServerConfig{
AmbientContext: cfg.AmbientCtx,
Expand All @@ -616,6 +618,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ParentDiskMonitor: cfg.TempStorageConfig.Mon,
BackfillerMonitor: backfillMemoryMonitor,
BackupMonitor: backupMemoryMonitor,
BulkSenderLimiter: bulkSenderLimiter,

ParentMemoryMonitor: rootSQLMemoryMonitor,
BulkAdder: func(
Expand All @@ -624,7 +627,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// Attach a child memory monitor to enable control over the BulkAdder's
// memory usage.
bulkMon := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "bulk-adder-monitor")
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon)
return bulk.MakeBulkAdder(ctx, db, cfg.distSender.RangeDescriptorCache(), cfg.Settings, ts, opts, bulkMon, bulkSenderLimiter)
},

Metrics: &distSQLMetrics,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/buildutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/metric",
Expand Down
Loading