Skip to content

Commit

Permalink
storage: backpressure SSTs before other writes if L0 fills up
Browse files Browse the repository at this point in the history
Currently we start compacting as soon as there are two or more files in
L0. If we reach 20, RocksDB will start slowing all writes in an attempt
to help the compactor catch up.

This global slowdown will affect foreground traffic, liveness, etc and
could significantly impact latency of live traffic or even availability.

Directly ingesting large numbers of small files could make it easier to
hit this limit than it usually is with normal writes -- normal writes
are buffered in the memtable and flushed all at once but direct
ingestion could add potentially lots of files rather quickly.

Additionally, SST ingetions are currently only done by bulk operations
like RESTORE, IMPORT or index backfills. These are all less latency
sensitive then foreground traffic, so we'd prefer to backpressure them
before we risk approaching the global slowdown trigger.

Release note (performance improvement): backpressure bulk operations before other traffic.
  • Loading branch information
dt committed Apr 10, 2019
1 parent 7669ac5 commit 639445d
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 0 deletions.
4 changes: 4 additions & 0 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
rep->GetIntProperty("rocksdb.estimate-pending-compaction-bytes",
&pending_compaction_bytes_estimate);

std::string l0_file_count_str;
rep->GetProperty("rocksdb.num-files-at-level0", &l0_file_count_str);

stats->block_cache_hits = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
stats->block_cache_misses = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
stats->block_cache_usage = (int64_t)block_cache->GetUsage();
Expand All @@ -216,6 +219,7 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
stats->compactions = (int64_t)event_listener->GetCompactions();
stats->table_readers_mem_estimate = table_readers_mem_estimate;
stats->pending_compaction_bytes_estimate = pending_compaction_bytes_estimate;
stats->l0_file_count = std::atoi(l0_file_count_str.c_str());
return kSuccess;
}

Expand Down
1 change: 1 addition & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ typedef struct {
int64_t compactions;
int64_t table_readers_mem_estimate;
int64_t pending_compaction_bytes_estimate;
int64_t l0_file_count;
} DBStatsResult;

typedef struct {
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_outstanding_size</code></td><td>byte size</td><td><code>256 KiB</code></td><td>maximum number of bytes used to track in-flight pipelined writes before disabling pipelining</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.delay_l0_file</code></td><td>duration</td><td><code>200ms</code></td><td>delay to add to SST ingestions per file in L0 over the configured limit</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.l0_file_count_threshold</code></td><td>integer</td><td><code>20</code></td><td>number of L0 files after which to backpressure SST ingestions</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.max_delay</code></td><td>duration</td><td><code>5s</code></td><td>maximum amount of time to backpressure a single SST ingestion</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.pending_compaction_threshold</code></td><td>byte size</td><td><code>64 GiB</code></td><td>pending compaction estimate above which to backpressure SST ingestions</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>schemachanger.backfiller.buffer_size</code></td><td>byte size</td><td><code>196 MiB</code></td><td>amount to buffer in memory during backfills</td></tr>
<tr><td><code>schemachanger.backfiller.max_sst_size</code></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for ingested files during backfills</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ type Engine interface {
// case of hard-links) when allowFileModifications true. See additional
// comments in db.cc's IngestExternalFile explaining modification behavior.
IngestExternalFiles(ctx context.Context, paths []string, skipWritingSeqNo, allowFileModifications bool) error
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
PreIngestDelay(ctx context.Context)
// ApproximateDiskBytes returns an approximation of the on-disk size for the given key span.
ApproximateDiskBytes(from, to roachpb.Key) (uint64, error)
// CompactRange ensures that the specified range of key value pairs is
Expand Down Expand Up @@ -404,6 +408,7 @@ type Stats struct {
Compactions int64
TableReadersMemEstimate int64
PendingCompactionBytesEstimate int64
L0FileCount int64
}

// EnvStats is a set of RocksDB env stats, including encryption status.
Expand Down
76 changes: 76 additions & 0 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,30 @@ var rocksdbConcurrency = envutil.EnvOrDefaultInt(
return max
}())

var ingestDelayL0Threshold = settings.RegisterIntSetting(
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)

var ingestDelayPerFile = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.delay_l0_file",
"delay to add to SST ingestions per file in L0 over the configured limit",
time.Millisecond*200,
)

var ingestDelayPendingLimit = settings.RegisterByteSizeSetting(
"rocksdb.ingest_backpressure.pending_compaction_threshold",
"pending compaction estimate above which to backpressure SST ingestions",
64<<30,
)

var ingestDelayTime = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)

// Set to true to perform expensive iterator debug leak checking. In normal
// operation, we perform inexpensive iterator leak checking but those checks do
// not indicate where the leak arose. The expensive checking tracks the stack
Expand Down Expand Up @@ -1195,6 +1219,7 @@ func (r *RocksDB) GetStats() (*Stats, error) {
Compactions: int64(s.compactions),
TableReadersMemEstimate: int64(s.table_readers_mem_estimate),
PendingCompactionBytesEstimate: int64(s.pending_compaction_bytes_estimate),
L0FileCount: int64(s.l0_file_count),
}, nil
}

Expand Down Expand Up @@ -2966,6 +2991,57 @@ func (r *RocksDB) setAuxiliaryDir(d string) error {
return nil
}

// PreIngestDelay may choose to block for some duration if L0 has an excessive
// number of files in it or if PendingCompactionBytesEstimate is elevated. This
// it is intended to be called before ingesting a new SST, since we'd rather
// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
// instance and impact all forground traffic by adding too many files to it.
// After the number of L0 files exceeds the configured limit, it gradually
// begins delaying more for each additional file in L0 over the limit until
// hitting its configured (via settings) maximum delay. If the pending
// compaction limit is exceeded, it waits for the maximum delay.
func (r *RocksDB) PreIngestDelay(ctx context.Context) {
if r.cfg.Settings == nil {
return
}
stats, err := r.GetStats()
if err != nil {
log.Warningf(ctx, "failed to read stats: %+v", err)
return
}
targetDelay := calculatePreIngestDelay(r.cfg, stats)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func calculatePreIngestDelay(cfg RocksDBConfig, stats *Stats) time.Duration {
maxDelay := ingestDelayTime.Get(&cfg.Settings.SV)
l0Filelimit := ingestDelayL0Threshold.Get(&cfg.Settings.SV)
compactionLimit := ingestDelayPendingLimit.Get(&cfg.Settings.SV)

if stats.PendingCompactionBytesEstimate >= compactionLimit {
return maxDelay
}
const ramp = 10
if stats.L0FileCount > l0Filelimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile
if targetDelay > maxDelay {
return maxDelay
}
return targetDelay
}
return 0
}

// IngestExternalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
func (r *RocksDB) IngestExternalFiles(
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/engine/rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

const testCacheSize = 1 << 30 // 1 GB
Expand Down Expand Up @@ -1738,3 +1739,27 @@ func TestRocksDBWALFileEmptyBatch(t *testing.T) {
}
})
}

func TestIngestDelayLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
cfg := RocksDBConfig{Settings: cluster.MakeTestingClusterSettings()}

max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
stats Stats
}{
{0, Stats{}},
{0, Stats{L0FileCount: 19}},
{0, Stats{L0FileCount: 20}},
{ramp, Stats{L0FileCount: 21}},
{ramp * 2, Stats{L0FileCount: 22}},
{max, Stats{L0FileCount: 55}},
{0, Stats{PendingCompactionBytesEstimate: 20 << 30}},
{max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}},
{max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}},
} {
require.Equal(t, tc.exp, calculatePreIngestDelay(cfg, &tc.stats))
}
}
2 changes: 2 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ func addSSTablePreApply(
log.Fatalf(ctx, "sideloaded SSTable at term %d, index %d is missing", term, index)
}

eng.PreIngestDelay(ctx)

// as of VersionUnreplicatedRaftTruncatedState we were on rocksdb 5.17 so this
// cluster version should indicate that we will never use rocksdb < 5.16 to
// read these SSTs, so it is safe to use https://github.com/facebook/rocksdb/pull/4172
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2759,6 +2759,7 @@ func (s *Store) Send(
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
s.engine.PreIngestDelay(ctx)
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
Expand Down

0 comments on commit 639445d

Please sign in to comment.