diff --git a/c-deps/libroach/engine.cc b/c-deps/libroach/engine.cc
index 6fbe3a89c375..69eedc8c62ed 100644
--- a/c-deps/libroach/engine.cc
+++ b/c-deps/libroach/engine.cc
@@ -203,6 +203,10 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
rep->GetIntProperty("rocksdb.estimate-pending-compaction-bytes",
&pending_compaction_bytes_estimate);
+ std::string l0_file_count_str;
+ rep->GetProperty("rocksdb.num-files-at-level0",
+ &l0_file_count_str);
+
stats->block_cache_hits = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
stats->block_cache_misses = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
stats->block_cache_usage = (int64_t)block_cache->GetUsage();
@@ -216,6 +220,7 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
stats->compactions = (int64_t)event_listener->GetCompactions();
stats->table_readers_mem_estimate = table_readers_mem_estimate;
stats->pending_compaction_bytes_estimate = pending_compaction_bytes_estimate;
+ stats->l0_file_count = std::atoi(l0_file_count_str.c_str());
return kSuccess;
}
diff --git a/c-deps/libroach/include/libroach.h b/c-deps/libroach/include/libroach.h
index a3338a355597..38e701c0322c 100644
--- a/c-deps/libroach/include/libroach.h
+++ b/c-deps/libroach/include/libroach.h
@@ -348,6 +348,7 @@ typedef struct {
int64_t compactions;
int64_t table_readers_mem_estimate;
int64_t pending_compaction_bytes_estimate;
+ int64_t l0_file_count;
} DBStatsResult;
typedef struct {
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 3028ed9a83f8..58ee1a8d51b1 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -55,6 +55,10 @@
kv.transaction.write_pipelining_enabled | boolean | true | if enabled, transactional writes are pipelined through Raft consensus |
kv.transaction.write_pipelining_max_batch_size | integer | 128 | if non-zero, defines that maximum size batch that will be pipelined through Raft consensus |
kv.transaction.write_pipelining_max_outstanding_size | byte size | 256 KiB | maximum number of bytes used to track in-flight pipelined writes before disabling pipelining |
+rocksdb.ingest_backpressure.delay_l0_file | duration | 200ms | delay to add to ingestions per file in L0 over the configured limit |
+rocksdb.ingest_backpressure.l0_file_count_threshold | integer | 20 | number of L0 files after which to backpressure SST ingestions |
+rocksdb.ingest_backpressure.max_delay | duration | 5s | maximum amount of time to backpressure ingestions |
+rocksdb.ingest_backpressure.pending_compaction_threshold | byte size | 64 GiB | pending compaction estimate above which to backpressure ingestions |
rocksdb.min_wal_sync_interval | duration | 0s | minimum duration between syncs of the RocksDB WAL |
schemachanger.backfiller.buffer_size | byte size | 196 MiB | amount to buffer in memory during backfills |
schemachanger.backfiller.max_sst_size | byte size | 16 MiB | target size for ingested files during backfills |
diff --git a/pkg/storage/engine/engine.go b/pkg/storage/engine/engine.go
index 782d77513f42..d6853091ad8c 100644
--- a/pkg/storage/engine/engine.go
+++ b/pkg/storage/engine/engine.go
@@ -404,6 +404,7 @@ type Stats struct {
Compactions int64
TableReadersMemEstimate int64
PendingCompactionBytesEstimate int64
+ L0FileCount int64
}
// EnvStats is a set of RocksDB env stats, including encryption status.
diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go
index 576fa6b129cc..1c336517dee5 100644
--- a/pkg/storage/engine/rocksdb.go
+++ b/pkg/storage/engine/rocksdb.go
@@ -80,6 +80,30 @@ var rocksdbConcurrency = envutil.EnvOrDefaultInt(
return max
}())
+var ingestDelayL0Threshold = settings.RegisterIntSetting(
+ "rocksdb.ingest_backpressure.l0_file_count_threshold",
+ "number of L0 files after which to backpressure SST ingestions",
+ 20,
+)
+
+var ingestDelayPerFile = settings.RegisterDurationSetting(
+ "rocksdb.ingest_backpressure.delay_l0_file",
+ "delay to add to ingestions per file in L0 over the configured limit",
+ time.Millisecond*200,
+)
+
+var ingestDelayPendingLimit = settings.RegisterByteSizeSetting(
+ "rocksdb.ingest_backpressure.pending_compaction_threshold",
+ "pending compaction estimate above which to backpressure ingestions",
+ 64<<30,
+)
+
+var ingestDelayTime = settings.RegisterDurationSetting(
+ "rocksdb.ingest_backpressure.max_delay",
+ "maximum amount of time to backpressure ingestions",
+ time.Second*5,
+)
+
// Set to true to perform expensive iterator debug leak checking. In normal
// operation, we perform inexpensive iterator leak checking but those checks do
// not indicate where the leak arose. The expensive checking tracks the stack
@@ -1195,6 +1219,7 @@ func (r *RocksDB) GetStats() (*Stats, error) {
Compactions: int64(s.compactions),
TableReadersMemEstimate: int64(s.table_readers_mem_estimate),
PendingCompactionBytesEstimate: int64(s.pending_compaction_bytes_estimate),
+ L0FileCount: int64(s.l0_file_count),
}, nil
}
@@ -2966,6 +2991,60 @@ func (r *RocksDB) setAuxiliaryDir(d string) error {
return nil
}
+var PreIngestDefaultDelayLimit time.Duration
+
+// PreIngestDelay may choose to block for some duration if L0 has an excessive
+// number of files in it or if PendingCompactionBytesEstimate is elevated. This
+// it is intended to be called before ingesting a new SST since we'd rather
+// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
+// instance and impact forground traffic. After the number of L0 files exceeds
+// the configured limit, it gradually begins delaying more for each additional
+// file until hitting maxDelay. If the pending compaction limit is exceeded, it
+// waits for maxDelay.
+func (r *RocksDB) PreIngestDelay(ctx context.Context, maxDelay time.Duration) {
+ if r.cfg.Settings == nil {
+ return
+ }
+ stats, err := r.GetStats()
+ if err != nil {
+ log.Warningf(ctx, "failed to read stats: %+v", err)
+ return
+ }
+ targetDelay := r.calculatePreIngestDelay(stats, maxDelay)
+
+ if targetDelay == 0 {
+ return
+ }
+ log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate)
+
+ select {
+ case <-time.After(targetDelay):
+ case <-ctx.Done():
+ }
+}
+
+func (r *RocksDB) calculatePreIngestDelay(stats *Stats, maxDelay time.Duration) time.Duration {
+ if maxDelay == PreIngestDefaultDelayLimit {
+ maxDelay = ingestDelayTime.Get(&r.cfg.Settings.SV)
+ }
+ l0Filelimit := ingestDelayL0Threshold.Get(&r.cfg.Settings.SV)
+ compactionLimit := ingestDelayPendingLimit.Get(&r.cfg.Settings.SV)
+
+ if stats.PendingCompactionBytesEstimate >= compactionLimit {
+ return maxDelay
+ }
+ const ramp = 10
+ if stats.L0FileCount > l0Filelimit {
+ delayPerFile := maxDelay / time.Duration(ramp)
+ targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile
+ if targetDelay > maxDelay {
+ return maxDelay
+ }
+ return targetDelay
+ }
+ return 0
+}
+
// IngestExternalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
func (r *RocksDB) IngestExternalFiles(
diff --git a/pkg/storage/engine/rocksdb_test.go b/pkg/storage/engine/rocksdb_test.go
index b742700092a4..662fd67081fe 100644
--- a/pkg/storage/engine/rocksdb_test.go
+++ b/pkg/storage/engine/rocksdb_test.go
@@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/stretchr/testify/require"
)
const testCacheSize = 1 << 30 // 1 GB
@@ -1738,3 +1739,33 @@ func TestRocksDBWALFileEmptyBatch(t *testing.T) {
}
})
}
+
+func TestIngestDelayLimit(t *testing.T) {
+ r := &RocksDB{cfg: RocksDBConfig{Settings: cluster.MakeTestingClusterSettings()}}
+
+ max, ramp := time.Second*5, time.Second*5/10
+
+ for _, tc := range []struct {
+ exp time.Duration
+ stats Stats
+ max time.Duration
+ }{
+ {0, Stats{}, PreIngestDefaultDelayLimit},
+ {0, Stats{L0FileCount: 19}, PreIngestDefaultDelayLimit},
+ {0, Stats{L0FileCount: 20}, PreIngestDefaultDelayLimit},
+ {ramp, Stats{L0FileCount: 21}, PreIngestDefaultDelayLimit},
+ {ramp * 2, Stats{L0FileCount: 22}, PreIngestDefaultDelayLimit},
+ {max, Stats{L0FileCount: 35}, PreIngestDefaultDelayLimit},
+ {max, Stats{L0FileCount: 55}, PreIngestDefaultDelayLimit},
+ {time.Second * 2, Stats{L0FileCount: 35}, time.Second * 2},
+ {0, Stats{PendingCompactionBytesEstimate: 20 << 30}, PreIngestDefaultDelayLimit},
+ {time.Second * 2, Stats{PendingCompactionBytesEstimate: 80 << 30}, time.Second * 2},
+ {max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}, PreIngestDefaultDelayLimit},
+ {time.Second * 2, Stats{L0FileCount: 15, PendingCompactionBytesEstimate: 80 << 30}, time.Second * 2},
+ {time.Second * 2, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}, time.Second * 2},
+ {max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}, PreIngestDefaultDelayLimit},
+ } {
+ require.Equal(t, tc.exp, r.calculatePreIngestDelay(&tc.stats, tc.max))
+ }
+
+}
diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go
index c97796b07ec7..7180c70e5df7 100644
--- a/pkg/storage/replica_proposal.go
+++ b/pkg/storage/replica_proposal.go
@@ -390,6 +390,10 @@ func addSSTablePreApply(
log.Fatalf(ctx, "sideloaded SSTable at term %d, index %d is missing", term, index)
}
+ if r, ok := eng.(*engine.RocksDB); ok {
+ r.PreIngestDelay(ctx, engine.PreIngestDefaultDelayLimit)
+ }
+
// as of VersionUnreplicatedRaftTruncatedState we were on rocksdb 5.17 so this
// cluster version should indicate that we will never use rocksdb < 5.16 to
// read these SSTs, so it is safe to use https://github.com/facebook/rocksdb/pull/4172
diff --git a/pkg/storage/store.go b/pkg/storage/store.go
index 3e751cdd7d9b..f06b95637146 100644
--- a/pkg/storage/store.go
+++ b/pkg/storage/store.go
@@ -2764,6 +2764,10 @@ func (s *Store) Send(
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
+ const maxDelay = time.Second * 15
+ if rocks, ok := s.engine.(*engine.RocksDB); ok {
+ rocks.PreIngestDelay(ctx, maxDelay)
+ }
}
if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {