Skip to content

Commit

Permalink
storage: backpressure SSTs before other writes if L0 fills up
Browse files Browse the repository at this point in the history
Currently we start compacting as soon as there are two or more files in
L0. If we reach 20, RocksDB will start slowing all writes in an attempt
to help the compactor catch up.

This global slowdown will affect foreground traffic, liveness, etc and
could significantly impact latency of live traffic or even availability.

Directly ingesting large numbers of small files could make it easier to
hit this limit than it usually is with normal writes -- normal writes
are buffered in the memtable and flushed all at once but direct
ingestion could add potentially lots of files rather quickly.

Additionally, SST ingetions are currently only done by bulk operations
like RESTORE, IMPORT or index backfills. These are all less latency
sensitive then foreground traffic, so we'd prefer to backpressure them
before we risk approaching the global slowdown trigger.

Release note (performance improvement): backpressure bulk operations before other traffic.
  • Loading branch information
dt committed Apr 9, 2019
1 parent 8426a2c commit 4ad3442
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 0 deletions.
5 changes: 5 additions & 0 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
rep->GetIntProperty("rocksdb.estimate-pending-compaction-bytes",
&pending_compaction_bytes_estimate);

std::string l0_file_count_str;
rep->GetProperty("rocksdb.num-files-at-level0",
&l0_file_count_str);

stats->block_cache_hits = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
stats->block_cache_misses = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
stats->block_cache_usage = (int64_t)block_cache->GetUsage();
Expand All @@ -216,6 +220,7 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
stats->compactions = (int64_t)event_listener->GetCompactions();
stats->table_readers_mem_estimate = table_readers_mem_estimate;
stats->pending_compaction_bytes_estimate = pending_compaction_bytes_estimate;
stats->l0_file_count = std::atoi(l0_file_count_str.c_str());
return kSuccess;
}

Expand Down
1 change: 1 addition & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ typedef struct {
int64_t compactions;
int64_t table_readers_mem_estimate;
int64_t pending_compaction_bytes_estimate;
int64_t l0_file_count;
} DBStatsResult;

typedef struct {
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_outstanding_size</code></td><td>byte size</td><td><code>256 KiB</code></td><td>maximum number of bytes used to track in-flight pipelined writes before disabling pipelining</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.delay_l0_file</code></td><td>duration</td><td><code>200ms</code></td><td>delay to add to ingestions per file in L0 over the configured limit</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.l0_file_count_threshold</code></td><td>integer</td><td><code>20</code></td><td>number of L0 files after which to backpressure SST ingestions</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.max_delay</code></td><td>duration</td><td><code>5s</code></td><td>maximum amount of time to backpressure ingestions</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.pending_compaction_threshold</code></td><td>byte size</td><td><code>64 GiB</code></td><td>pending compaction estimate above which to backpressure ingestions</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>schemachanger.backfiller.buffer_size</code></td><td>byte size</td><td><code>196 MiB</code></td><td>amount to buffer in memory during backfills</td></tr>
<tr><td><code>schemachanger.backfiller.max_sst_size</code></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for ingested files during backfills</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ type Stats struct {
Compactions int64
TableReadersMemEstimate int64
PendingCompactionBytesEstimate int64
L0FileCount int64
}

// EnvStats is a set of RocksDB env stats, including encryption status.
Expand Down
79 changes: 79 additions & 0 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,30 @@ var rocksdbConcurrency = envutil.EnvOrDefaultInt(
return max
}())

var ingestDelayL0Threshold = settings.RegisterIntSetting(
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)

var ingestDelayPerFile = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.delay_l0_file",
"delay to add to ingestions per file in L0 over the configured limit",
time.Millisecond*200,
)

var ingestDelayPendingLimit = settings.RegisterByteSizeSetting(
"rocksdb.ingest_backpressure.pending_compaction_threshold",
"pending compaction estimate above which to backpressure ingestions",
64<<30,
)

var ingestDelayTime = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure ingestions",
time.Second*5,
)

// Set to true to perform expensive iterator debug leak checking. In normal
// operation, we perform inexpensive iterator leak checking but those checks do
// not indicate where the leak arose. The expensive checking tracks the stack
Expand Down Expand Up @@ -1195,6 +1219,7 @@ func (r *RocksDB) GetStats() (*Stats, error) {
Compactions: int64(s.compactions),
TableReadersMemEstimate: int64(s.table_readers_mem_estimate),
PendingCompactionBytesEstimate: int64(s.pending_compaction_bytes_estimate),
L0FileCount: int64(s.l0_file_count),
}, nil
}

Expand Down Expand Up @@ -2966,6 +2991,60 @@ func (r *RocksDB) setAuxiliaryDir(d string) error {
return nil
}

var PreIngestDefaultDelayLimit time.Duration

// PreIngestDelay may choose to block for some duration if L0 has an excessive
// number of files in it or if PendingCompactionBytesEstimate is elevated. This
// it is intended to be called before ingesting a new SST since we'd rather
// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
// instance and impact forground traffic. After the number of L0 files exceeds
// the configured limit, it gradually begins delaying more for each additional
// file until hitting maxDelay. If the pending compaction limit is exceeded, it
// waits for maxDelay.
func (r *RocksDB) PreIngestDelay(ctx context.Context, maxDelay time.Duration) {
if r.cfg.Settings == nil {
return
}
stats, err := r.GetStats()
if err != nil {
log.Warningf(ctx, "failed to read stats: %+v", err)
return
}
targetDelay := r.calculatePreIngestDelay(stats, maxDelay)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func (r *RocksDB) calculatePreIngestDelay(stats *Stats, maxDelay time.Duration) time.Duration {
if maxDelay == PreIngestDefaultDelayLimit {
maxDelay = ingestDelayTime.Get(&r.cfg.Settings.SV)
}
l0Filelimit := ingestDelayL0Threshold.Get(&r.cfg.Settings.SV)
compactionLimit := ingestDelayPendingLimit.Get(&r.cfg.Settings.SV)

if stats.PendingCompactionBytesEstimate >= compactionLimit {
return maxDelay
}
const ramp = 10
if stats.L0FileCount > l0Filelimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile
if targetDelay > maxDelay {
return maxDelay
}
return targetDelay
}
return 0
}

// IngestExternalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
func (r *RocksDB) IngestExternalFiles(
Expand Down
31 changes: 31 additions & 0 deletions pkg/storage/engine/rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

const testCacheSize = 1 << 30 // 1 GB
Expand Down Expand Up @@ -1738,3 +1739,33 @@ func TestRocksDBWALFileEmptyBatch(t *testing.T) {
}
})
}

func TestIngestDelayLimit(t *testing.T) {
r := &RocksDB{cfg: RocksDBConfig{Settings: cluster.MakeTestingClusterSettings()}}

max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
stats Stats
max time.Duration
}{
{0, Stats{}, PreIngestDefaultDelayLimit},
{0, Stats{L0FileCount: 19}, PreIngestDefaultDelayLimit},
{0, Stats{L0FileCount: 20}, PreIngestDefaultDelayLimit},
{ramp, Stats{L0FileCount: 21}, PreIngestDefaultDelayLimit},
{ramp * 2, Stats{L0FileCount: 22}, PreIngestDefaultDelayLimit},
{max, Stats{L0FileCount: 35}, PreIngestDefaultDelayLimit},
{max, Stats{L0FileCount: 55}, PreIngestDefaultDelayLimit},
{time.Second * 2, Stats{L0FileCount: 35}, time.Second * 2},
{0, Stats{PendingCompactionBytesEstimate: 20 << 30}, PreIngestDefaultDelayLimit},
{time.Second * 2, Stats{PendingCompactionBytesEstimate: 80 << 30}, time.Second * 2},
{max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}, PreIngestDefaultDelayLimit},
{time.Second * 2, Stats{L0FileCount: 15, PendingCompactionBytesEstimate: 80 << 30}, time.Second * 2},
{time.Second * 2, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}, time.Second * 2},
{max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}, PreIngestDefaultDelayLimit},
} {
require.Equal(t, tc.exp, r.calculatePreIngestDelay(&tc.stats, tc.max))
}

}
4 changes: 4 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,10 @@ func addSSTablePreApply(
log.Fatalf(ctx, "sideloaded SSTable at term %d, index %d is missing", term, index)
}

if r, ok := eng.(*engine.RocksDB); ok {
r.PreIngestDelay(ctx, engine.PreIngestDefaultDelayLimit)
}

// as of VersionUnreplicatedRaftTruncatedState we were on rocksdb 5.17 so this
// cluster version should indicate that we will never use rocksdb < 5.16 to
// read these SSTs, so it is safe to use https://github.com/facebook/rocksdb/pull/4172
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2764,6 +2764,10 @@ func (s *Store) Send(
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
const maxDelay = time.Second * 15
if rocks, ok := s.engine.(*engine.RocksDB); ok {
rocks.PreIngestDelay(ctx, maxDelay)
}
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
Expand Down

0 comments on commit 4ad3442

Please sign in to comment.