diff --git a/pkg/storage/batcheval/cmd_add_sstable.go b/pkg/storage/batcheval/cmd_add_sstable.go index 331431978f6f..052fcf25df46 100644 --- a/pkg/storage/batcheval/cmd_add_sstable.go +++ b/pkg/storage/batcheval/cmd_add_sstable.go @@ -76,6 +76,15 @@ func EvalAddSSTable( } ms.Add(stats) + // If using RocksDB, check if we want to back-pressure due to L0 files now in + // the hopes of avoiding it later when we're actually ingesting during raft + // application. Note: this will only protect the proposer, so replicas could + // still end up backpressuring during apply but this at least avoids an easily + // detected and avoidable blocking of our own raft application. + if rocks, ok := cArgs.EvalCtx.Engine().(*engine.RocksDB); ok { + rocks.PreIngestDelay(ctx) + } + return result.Result{ Replicated: storagepb.ReplicatedEvalResult{ AddSSTable: &storagepb.ReplicatedEvalResult_AddSSTable{ diff --git a/pkg/storage/batcheval/cmd_add_sstable_test.go b/pkg/storage/batcheval/cmd_add_sstable_test.go index 316ae1fa9e2e..5b14f14c1623 100644 --- a/pkg/storage/batcheval/cmd_add_sstable_test.go +++ b/pkg/storage/batcheval/cmd_add_sstable_test.go @@ -348,7 +348,8 @@ func TestAddSSTableMVCCStats(t *testing.T) { RequestHeader: roachpb.RequestHeader{Key: keys.MinKey, EndKey: keys.MaxKey}, Data: sstBytes, }, - Stats: &enginepb.MVCCStats{}, + Stats: &enginepb.MVCCStats{}, + EvalCtx: &batcheval.MockEvalCtx{MockEngine: e}, } _, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil) if err != nil { diff --git a/pkg/storage/engine/rocksdb.go b/pkg/storage/engine/rocksdb.go index 882a2ff9de18..4384f6755a6e 100644 --- a/pkg/storage/engine/rocksdb.go +++ b/pkg/storage/engine/rocksdb.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logtags" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" humanize "github.com/dustin/go-humanize" @@ -2905,11 +2906,45 @@ func (r *RocksDB) setAuxiliaryDir(d string) error { return nil } +func (r *RocksDB) l0FileCount() int { + // TODO(dt): we could push a L0FileCount() method down instead of pulling all + // the files back with all their attributes just to count them. + var files int + for _, sst := range r.GetSSTables() { + if sst.Level == 0 { + files++ + } + } + return files +} + +// PreIngestDelay may choose to block for some duration if L0 has an exessive +// number of files in it -- it is intended to be called before ingesting a new +// SST since we'd rather backpressure the bulk operation adding SSTs before we +// hit the global slowdown trigger that would also impact forground traffic or +// potentially cause liveness problems and associated unavailability. +func (r *RocksDB) PreIngestDelay(ctx context.Context) { + // options.cc specifies 20 files for the global write slowdown so backpressure + // SST additions well before then at 10. + const l0SlowdownFiles = 10 + for i, re := 1, retry.StartWithCtx(ctx, retry.Options{MaxRetries: 6}); re.Next(); i++ { + files := r.l0FileCount() + if files < l0SlowdownFiles { + return + } + log.Warningf(ctx, "delaying SST ingestion due to %d files in L0 (attempt %d)", files, i) + // TODO(dt): ideally we'd only backpressure if this SST is going to l0 -- if + // it doesn't overlap and is going lower, we're not actually adding to the + // problem so this isn't needed. + } +} + // IngestExternalFiles atomically links a slice of files into the RocksDB // log-structured merge-tree. func (r *RocksDB) IngestExternalFiles( ctx context.Context, paths []string, allowFileModifications bool, ) error { + r.PreIngestDelay(ctx) cPaths := make([]*C.char, len(paths)) for i := range paths { cPaths[i] = C.CString(paths[i])