diff --git a/c-deps/libroach/engine.cc b/c-deps/libroach/engine.cc
index 6fbe3a89c375..dcae7f352d34 100644
--- a/c-deps/libroach/engine.cc
+++ b/c-deps/libroach/engine.cc
@@ -203,6 +203,9 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
rep->GetIntProperty("rocksdb.estimate-pending-compaction-bytes",
&pending_compaction_bytes_estimate);
+ std::string l0_file_count_str;
+ rep->GetProperty("rocksdb.num-files-at-level0", &l0_file_count_str);
+
stats->block_cache_hits = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
stats->block_cache_misses = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
stats->block_cache_usage = (int64_t)block_cache->GetUsage();
@@ -216,6 +219,7 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
stats->compactions = (int64_t)event_listener->GetCompactions();
stats->table_readers_mem_estimate = table_readers_mem_estimate;
stats->pending_compaction_bytes_estimate = pending_compaction_bytes_estimate;
+ stats->l0_file_count = std::atoi(l0_file_count_str.c_str());
return kSuccess;
}
diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h
index a3338a355597..38e701c0322c 100644
--- a/c-deps/libroach/include/libroach.h
+++ b/c-deps/libroach/include/libroach.h
@@ -348,6 +348,7 @@ typedef struct {
int64_t compactions;
int64_t table_readers_mem_estimate;
int64_t pending_compaction_bytes_estimate;
+ int64_t l0_file_count;
} DBStatsResult;
typedef struct {
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 3028ed9a83f8..4f688555d36a 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -55,6 +55,10 @@
kv.transaction.write_pipelining_enabled | boolean | true | if enabled, transactional writes are pipelined through Raft consensus |
kv.transaction.write_pipelining_max_batch_size | integer | 128 | if non-zero, defines that maximum size batch that will be pipelined through Raft consensus |
kv.transaction.write_pipelining_max_outstanding_size | byte size | 256 KiB | maximum number of bytes used to track in-flight pipelined writes before disabling pipelining |
+rocksdb.ingest_backpressure.delay_l0_file | duration | 200ms | delay to add to SST ingestions per file in L0 over the configured limit |
+rocksdb.ingest_backpressure.l0_file_count_threshold | integer | 20 | number of L0 files after which to backpressure SST ingestions |
+rocksdb.ingest_backpressure.max_delay | duration | 5s | maximum amount of time to backpressure a single SST ingestion |
+rocksdb.ingest_backpressure.pending_compaction_threshold | byte size | 64 GiB | pending compaction estimate above which to backpressure SST ingestions |
rocksdb.min_wal_sync_interval | duration | 0s | minimum duration between syncs of the RocksDB WAL |
schemachanger.backfiller.buffer_size | byte size | 196 MiB | amount to buffer in memory during backfills |
schemachanger.backfiller.max_sst_size | byte size | 16 MiB | target size for ingested files during backfills |
diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go
index 782d77513f42..2f9fb1031287 100644
--- a/pkg/storage/engine/engine.go
+++ b/pkg/storage/engine/engine.go
@@ -307,6 +307,10 @@ type Engine interface {
// case of hard-links) when allowFileModifications true. See additional
// comments in db.cc's IngestExternalFile explaining modification behavior.
IngestExternalFiles(ctx context.Context, paths []string, skipWritingSeqNo, allowFileModifications bool) error
+ // PreIngestDelay offers an engine the chance to backpressure ingestions.
+ // When called, it may choose to block if the engine determines that it is in
+ // or approaching a state where further ingestions may risk its health.
+ PreIngestDelay(ctx context.Context)
// ApproximateDiskBytes returns an approximation of the on-disk size for the given key span.
ApproximateDiskBytes(from, to roachpb.Key) (uint64, error)
// CompactRange ensures that the specified range of key value pairs is
@@ -404,6 +408,7 @@ type Stats struct {
Compactions int64
TableReadersMemEstimate int64
PendingCompactionBytesEstimate int64
+ L0FileCount int64
}
// EnvStats is a set of RocksDB env stats, including encryption status.
diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go
index 576fa6b129cc..b34bc38de401 100644
--- a/pkg/storage/engine/rocksdb.go
+++ b/pkg/storage/engine/rocksdb.go
@@ -80,6 +80,30 @@ var rocksdbConcurrency = envutil.EnvOrDefaultInt(
return max
}())
+var ingestDelayL0Threshold = settings.RegisterIntSetting(
+ "rocksdb.ingest_backpressure.l0_file_count_threshold",
+ "number of L0 files after which to backpressure SST ingestions",
+ 20,
+)
+
+var ingestDelayPerFile = settings.RegisterDurationSetting(
+ "rocksdb.ingest_backpressure.delay_l0_file",
+ "delay to add to SST ingestions per file in L0 over the configured limit",
+ time.Millisecond*200,
+)
+
+var ingestDelayPendingLimit = settings.RegisterByteSizeSetting(
+ "rocksdb.ingest_backpressure.pending_compaction_threshold",
+ "pending compaction estimate above which to backpressure SST ingestions",
+ 64<<30,
+)
+
+var ingestDelayTime = settings.RegisterDurationSetting(
+ "rocksdb.ingest_backpressure.max_delay",
+ "maximum amount of time to backpressure a single SST ingestion",
+ time.Second*5,
+)
+
// Set to true to perform expensive iterator debug leak checking. In normal
// operation, we perform inexpensive iterator leak checking but those checks do
// not indicate where the leak arose. The expensive checking tracks the stack
@@ -1195,6 +1219,7 @@ func (r *RocksDB) GetStats() (*Stats, error) {
Compactions: int64(s.compactions),
TableReadersMemEstimate: int64(s.table_readers_mem_estimate),
PendingCompactionBytesEstimate: int64(s.pending_compaction_bytes_estimate),
+ L0FileCount: int64(s.l0_file_count),
}, nil
}
@@ -2966,6 +2991,57 @@ func (r *RocksDB) setAuxiliaryDir(d string) error {
return nil
}
+// PreIngestDelay may choose to block for some duration if L0 has an excessive
+// number of files in it or if PendingCompactionBytesEstimate is elevated. This
+// it is intended to be called before ingesting a new SST, since we'd rather
+// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
+// instance and impact all forground traffic by adding too many files to it.
+// After the number of L0 files exceeds the configured limit, it gradually
+// begins delaying more for each additional file in L0 over the limit until
+// hitting its configured (via settings) maximum delay. If the pending
+// compaction limit is exceeded, it waits for the maximum delay.
+func (r *RocksDB) PreIngestDelay(ctx context.Context) {
+ if r.cfg.Settings == nil {
+ return
+ }
+ stats, err := r.GetStats()
+ if err != nil {
+ log.Warningf(ctx, "failed to read stats: %+v", err)
+ return
+ }
+ targetDelay := calculatePreIngestDelay(r.cfg, stats)
+
+ if targetDelay == 0 {
+ return
+ }
+ log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate)
+
+ select {
+ case <-time.After(targetDelay):
+ case <-ctx.Done():
+ }
+}
+
+func calculatePreIngestDelay(cfg RocksDBConfig, stats *Stats) time.Duration {
+ maxDelay := ingestDelayTime.Get(&cfg.Settings.SV)
+ l0Filelimit := ingestDelayL0Threshold.Get(&cfg.Settings.SV)
+ compactionLimit := ingestDelayPendingLimit.Get(&cfg.Settings.SV)
+
+ if stats.PendingCompactionBytesEstimate >= compactionLimit {
+ return maxDelay
+ }
+ const ramp = 10
+ if stats.L0FileCount > l0Filelimit {
+ delayPerFile := maxDelay / time.Duration(ramp)
+ targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile
+ if targetDelay > maxDelay {
+ return maxDelay
+ }
+ return targetDelay
+ }
+ return 0
+}
+
// IngestExternalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
func (r *RocksDB) IngestExternalFiles(
diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go
index b742700092a4..cadc9432cce5 100644
--- a/pkg/storage/engine/rocksdb_test.go
+++ b/pkg/storage/engine/rocksdb_test.go
@@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/stretchr/testify/require"
)
const testCacheSize = 1 << 30 // 1 GB
@@ -1738,3 +1739,27 @@ func TestRocksDBWALFileEmptyBatch(t *testing.T) {
}
})
}
+
+func TestIngestDelayLimit(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ cfg := RocksDBConfig{Settings: cluster.MakeTestingClusterSettings()}
+
+ max, ramp := time.Second*5, time.Second*5/10
+
+ for _, tc := range []struct {
+ exp time.Duration
+ stats Stats
+ }{
+ {0, Stats{}},
+ {0, Stats{L0FileCount: 19}},
+ {0, Stats{L0FileCount: 20}},
+ {ramp, Stats{L0FileCount: 21}},
+ {ramp * 2, Stats{L0FileCount: 22}},
+ {max, Stats{L0FileCount: 55}},
+ {0, Stats{PendingCompactionBytesEstimate: 20 << 30}},
+ {max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}},
+ {max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}},
+ } {
+ require.Equal(t, tc.exp, calculatePreIngestDelay(cfg, &tc.stats))
+ }
+}
diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go
index e47cc74d16cf..0f74b5f2a325 100644
--- a/pkg/storage/replica_proposal.go
+++ b/pkg/storage/replica_proposal.go
@@ -390,6 +390,8 @@ func addSSTablePreApply(
log.Fatalf(ctx, "sideloaded SSTable at term %d, index %d is missing", term, index)
}
+ eng.PreIngestDelay(ctx)
+
// as of VersionUnreplicatedRaftTruncatedState we were on rocksdb 5.17 so this
// cluster version should indicate that we will never use rocksdb < 5.16 to
// read these SSTs, so it is safe to use https://github.com/facebook/rocksdb/pull/4172
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 4dd2a64899be..c369a57fcd9b 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -2759,6 +2759,7 @@ func (s *Store) Send(
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
+ s.engine.PreIngestDelay(ctx)
}
if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {