Skip to content

Commit

Permalink
storage: backpressure SSTs before other writes if L0 fills up
Browse files Browse the repository at this point in the history
Currently we start compacting as soon as there are two or more files in
L0. If we reach 20, RocksDB will start slowing all writes in an attempt
to help the compactor catch up.

This global slowdown will affect foreground traffic, liveness, etc and
could significantly impact latency of live traffic or even availability.

Directly ingesting large numbers of small files could make it easier to
hit this limit than it usually is with normal writes -- normal writes
are buffered in the memtable and flushed all at once but direct
ingestion could add potentially lots of files rather quickly.

Additionally, SST ingetions are currently only done by bulk operations
like RESTORE, IMPORT or index backfills. These are all less latency
sensitive then foreground traffic, so we'd prefer to backpressure them
before we risk approaching the global slowdown trigger.

Release note (performance improvement): backpressure bulk operations before other traffic.
  • Loading branch information
dt committed Jan 25, 2019
1 parent fee2bee commit 1aeac0b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
9 changes: 9 additions & 0 deletions pkg/storage/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ func EvalAddSSTable(
}
ms.Add(stats)

// If using RocksDB, check if we want to back-pressure due to L0 files now in
// the hopes of avoiding it later when we're actually ingesting during raft
// application. Note: this will only protect the proposer, so replicas could
// still end up backpressuring during apply but this at least avoids an easily
// detected and avoidable blocking of our own raft application.
if rocks, ok := cArgs.EvalCtx.Engine().(*engine.RocksDB); ok {
rocks.PreIngestDelay(ctx)
}

return result.Result{
Replicated: storagepb.ReplicatedEvalResult{
AddSSTable: &storagepb.ReplicatedEvalResult_AddSSTable{
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ func TestAddSSTableMVCCStats(t *testing.T) {
RequestHeader: roachpb.RequestHeader{Key: keys.MinKey, EndKey: keys.MaxKey},
Data: sstBytes,
},
Stats: &enginepb.MVCCStats{},
Stats: &enginepb.MVCCStats{},
EvalCtx: &batcheval.MockEvalCtx{MockEngine: e},
}
_, err := batcheval.EvalAddSSTable(ctx, e, cArgs, nil)
if err != nil {
Expand Down
35 changes: 35 additions & 0 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logtags"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
humanize "github.com/dustin/go-humanize"
Expand Down Expand Up @@ -2905,11 +2906,45 @@ func (r *RocksDB) setAuxiliaryDir(d string) error {
return nil
}

func (r *RocksDB) l0FileCount() int {
// TODO(dt): we could push a L0FileCount() method down instead of pulling all
// the files back with all their attributes just to count them.
var files int
for _, sst := range r.GetSSTables() {
if sst.Level == 0 {
files++
}
}
return files
}

// PreIngestDelay may choose to block for some duration if L0 has an exessive
// number of files in it -- it is intended to be called before ingesting a new
// SST since we'd rather backpressure the bulk operation adding SSTs before we
// hit the global slowdown trigger that would also impact forground traffic or
// potentially cause liveness problems and associated unavailability.
func (r *RocksDB) PreIngestDelay(ctx context.Context) {
// options.cc specifies 20 files for the global write slowdown so backpressure
// SST additions well before then at 10.
const l0SlowdownFiles = 10
for i, re := 1, retry.StartWithCtx(ctx, retry.Options{MaxRetries: 6}); re.Next(); i++ {
files := r.l0FileCount()
if files < l0SlowdownFiles {
return
}
log.Warningf(ctx, "delaying SST ingestion due to %d files in L0 (attempt %d)", files, i)
// TODO(dt): ideally we'd only backpressure if this SST is going to l0 -- if
// it doesn't overlap and is going lower, we're not actually adding to the
// problem so this isn't needed.
}
}

// IngestExternalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
func (r *RocksDB) IngestExternalFiles(
ctx context.Context, paths []string, allowFileModifications bool,
) error {
r.PreIngestDelay(ctx)
cPaths := make([]*C.char, len(paths))
for i := range paths {
cPaths[i] = C.CString(paths[i])
Expand Down

0 comments on commit 1aeac0b

Please sign in to comment.