Skip to content

Commit

Permalink
Merge pull request #36738 from dt/backport19.1-34258
Browse files Browse the repository at this point in the history
release-19.1: storage: backpressure SSTs before other writes if L0 fills up
  • Loading branch information
vivekmenezes authored Apr 11, 2019
2 parents b652751 + 639445d commit 134478e
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 0 deletions.
4 changes: 4 additions & 0 deletions c-deps/libroach/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
rep->GetIntProperty("rocksdb.estimate-pending-compaction-bytes",
&pending_compaction_bytes_estimate);

std::string l0_file_count_str;
rep->GetProperty("rocksdb.num-files-at-level0", &l0_file_count_str);

stats->block_cache_hits = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_HIT);
stats->block_cache_misses = (int64_t)s->getTickerCount(rocksdb::BLOCK_CACHE_MISS);
stats->block_cache_usage = (int64_t)block_cache->GetUsage();
Expand All @@ -216,6 +219,7 @@ DBStatus DBImpl::GetStats(DBStatsResult* stats) {
stats->compactions = (int64_t)event_listener->GetCompactions();
stats->table_readers_mem_estimate = table_readers_mem_estimate;
stats->pending_compaction_bytes_estimate = pending_compaction_bytes_estimate;
stats->l0_file_count = std::atoi(l0_file_count_str.c_str());
return kSuccess;
}

Expand Down
1 change: 1 addition & 0 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ typedef struct {
int64_t compactions;
int64_t table_readers_mem_estimate;
int64_t pending_compaction_bytes_estimate;
int64_t l0_file_count;
} DBStatsResult;

typedef struct {
Expand Down
4 changes: 4 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
<tr><td><code>kv.transaction.write_pipelining_enabled</code></td><td>boolean</td><td><code>true</code></td><td>if enabled, transactional writes are pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_batch_size</code></td><td>integer</td><td><code>128</code></td><td>if non-zero, defines that maximum size batch that will be pipelined through Raft consensus</td></tr>
<tr><td><code>kv.transaction.write_pipelining_max_outstanding_size</code></td><td>byte size</td><td><code>256 KiB</code></td><td>maximum number of bytes used to track in-flight pipelined writes before disabling pipelining</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.delay_l0_file</code></td><td>duration</td><td><code>200ms</code></td><td>delay to add to SST ingestions per file in L0 over the configured limit</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.l0_file_count_threshold</code></td><td>integer</td><td><code>20</code></td><td>number of L0 files after which to backpressure SST ingestions</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.max_delay</code></td><td>duration</td><td><code>5s</code></td><td>maximum amount of time to backpressure a single SST ingestion</td></tr>
<tr><td><code>rocksdb.ingest_backpressure.pending_compaction_threshold</code></td><td>byte size</td><td><code>64 GiB</code></td><td>pending compaction estimate above which to backpressure SST ingestions</td></tr>
<tr><td><code>rocksdb.min_wal_sync_interval</code></td><td>duration</td><td><code>0s</code></td><td>minimum duration between syncs of the RocksDB WAL</td></tr>
<tr><td><code>schemachanger.backfiller.buffer_size</code></td><td>byte size</td><td><code>196 MiB</code></td><td>amount to buffer in memory during backfills</td></tr>
<tr><td><code>schemachanger.backfiller.max_sst_size</code></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for ingested files during backfills</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ type Engine interface {
// case of hard-links) when allowFileModifications true. See additional
// comments in db.cc's IngestExternalFile explaining modification behavior.
IngestExternalFiles(ctx context.Context, paths []string, skipWritingSeqNo, allowFileModifications bool) error
// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
PreIngestDelay(ctx context.Context)
// ApproximateDiskBytes returns an approximation of the on-disk size for the given key span.
ApproximateDiskBytes(from, to roachpb.Key) (uint64, error)
// CompactRange ensures that the specified range of key value pairs is
Expand Down Expand Up @@ -404,6 +408,7 @@ type Stats struct {
Compactions int64
TableReadersMemEstimate int64
PendingCompactionBytesEstimate int64
L0FileCount int64
}

// EnvStats is a set of RocksDB env stats, including encryption status.
Expand Down
76 changes: 76 additions & 0 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,30 @@ var rocksdbConcurrency = envutil.EnvOrDefaultInt(
return max
}())

var ingestDelayL0Threshold = settings.RegisterIntSetting(
"rocksdb.ingest_backpressure.l0_file_count_threshold",
"number of L0 files after which to backpressure SST ingestions",
20,
)

var ingestDelayPerFile = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.delay_l0_file",
"delay to add to SST ingestions per file in L0 over the configured limit",
time.Millisecond*200,
)

var ingestDelayPendingLimit = settings.RegisterByteSizeSetting(
"rocksdb.ingest_backpressure.pending_compaction_threshold",
"pending compaction estimate above which to backpressure SST ingestions",
64<<30,
)

var ingestDelayTime = settings.RegisterDurationSetting(
"rocksdb.ingest_backpressure.max_delay",
"maximum amount of time to backpressure a single SST ingestion",
time.Second*5,
)

// Set to true to perform expensive iterator debug leak checking. In normal
// operation, we perform inexpensive iterator leak checking but those checks do
// not indicate where the leak arose. The expensive checking tracks the stack
Expand Down Expand Up @@ -1195,6 +1219,7 @@ func (r *RocksDB) GetStats() (*Stats, error) {
Compactions: int64(s.compactions),
TableReadersMemEstimate: int64(s.table_readers_mem_estimate),
PendingCompactionBytesEstimate: int64(s.pending_compaction_bytes_estimate),
L0FileCount: int64(s.l0_file_count),
}, nil
}

Expand Down Expand Up @@ -2966,6 +2991,57 @@ func (r *RocksDB) setAuxiliaryDir(d string) error {
return nil
}

// PreIngestDelay may choose to block for some duration if L0 has an excessive
// number of files in it or if PendingCompactionBytesEstimate is elevated. This
// it is intended to be called before ingesting a new SST, since we'd rather
// backpressure the bulk operation adding SSTs than slow down the whole RocksDB
// instance and impact all forground traffic by adding too many files to it.
// After the number of L0 files exceeds the configured limit, it gradually
// begins delaying more for each additional file in L0 over the limit until
// hitting its configured (via settings) maximum delay. If the pending
// compaction limit is exceeded, it waits for the maximum delay.
func (r *RocksDB) PreIngestDelay(ctx context.Context) {
if r.cfg.Settings == nil {
return
}
stats, err := r.GetStats()
if err != nil {
log.Warningf(ctx, "failed to read stats: %+v", err)
return
}
targetDelay := calculatePreIngestDelay(r.cfg, stats)

if targetDelay == 0 {
return
}
log.VEventf(ctx, 2, "delaying SST ingestion %s. %d L0 files, %db pending compaction", targetDelay, stats.L0FileCount, stats.PendingCompactionBytesEstimate)

select {
case <-time.After(targetDelay):
case <-ctx.Done():
}
}

func calculatePreIngestDelay(cfg RocksDBConfig, stats *Stats) time.Duration {
maxDelay := ingestDelayTime.Get(&cfg.Settings.SV)
l0Filelimit := ingestDelayL0Threshold.Get(&cfg.Settings.SV)
compactionLimit := ingestDelayPendingLimit.Get(&cfg.Settings.SV)

if stats.PendingCompactionBytesEstimate >= compactionLimit {
return maxDelay
}
const ramp = 10
if stats.L0FileCount > l0Filelimit {
delayPerFile := maxDelay / time.Duration(ramp)
targetDelay := time.Duration(stats.L0FileCount-l0Filelimit) * delayPerFile
if targetDelay > maxDelay {
return maxDelay
}
return targetDelay
}
return 0
}

// IngestExternalFiles atomically links a slice of files into the RocksDB
// log-structured merge-tree.
func (r *RocksDB) IngestExternalFiles(
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/engine/rocksdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

const testCacheSize = 1 << 30 // 1 GB
Expand Down Expand Up @@ -1738,3 +1739,27 @@ func TestRocksDBWALFileEmptyBatch(t *testing.T) {
}
})
}

func TestIngestDelayLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
cfg := RocksDBConfig{Settings: cluster.MakeTestingClusterSettings()}

max, ramp := time.Second*5, time.Second*5/10

for _, tc := range []struct {
exp time.Duration
stats Stats
}{
{0, Stats{}},
{0, Stats{L0FileCount: 19}},
{0, Stats{L0FileCount: 20}},
{ramp, Stats{L0FileCount: 21}},
{ramp * 2, Stats{L0FileCount: 22}},
{max, Stats{L0FileCount: 55}},
{0, Stats{PendingCompactionBytesEstimate: 20 << 30}},
{max, Stats{L0FileCount: 25, PendingCompactionBytesEstimate: 80 << 30}},
{max, Stats{L0FileCount: 35, PendingCompactionBytesEstimate: 20 << 30}},
} {
require.Equal(t, tc.exp, calculatePreIngestDelay(cfg, &tc.stats))
}
}
2 changes: 2 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ func addSSTablePreApply(
log.Fatalf(ctx, "sideloaded SSTable at term %d, index %d is missing", term, index)
}

eng.PreIngestDelay(ctx)

// as of VersionUnreplicatedRaftTruncatedState we were on rocksdb 5.17 so this
// cluster version should indicate that we will never use rocksdb < 5.16 to
// read these SSTs, so it is safe to use https://github.com/facebook/rocksdb/pull/4172
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2759,6 +2759,7 @@ func (s *Store) Send(
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
s.engine.PreIngestDelay(ctx)
}

if err := ba.SetActiveTimestamp(s.Clock().Now); err != nil {
Expand Down

0 comments on commit 134478e

Please sign in to comment.