Skip to content

Commit

Permalink
storage: make queue timeouts controllable, snapshot sending queues dy…
Browse files Browse the repository at this point in the history
…namic

In #42686 we made the raft snapshot queue timeout dynamic and based on the
size of the snapshot being sent. We also added an escape hatch to control
the timeout of processing of that queue. This change generalizes that
cluster setting to apply to all of the queues.

It so happens that the replicate queue and the merge queue also sometimes
need to send snapshots. This PR gives them similar treatment to the
raft snapshot queue.

The previous cluster setting was never released and is reserved so it does not
need a release note.

Release note: None
  • Loading branch information
ajwerner committed Feb 11, 2020
1 parent 8dcc132 commit 519eba8
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 64 deletions.
16 changes: 14 additions & 2 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,20 @@ func newMergeQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *mergeQue
mq.baseQueue = newBaseQueue(
"merge", mq, store, gossip,
queueConfig{
maxSize: defaultQueueMaxSize,
maxConcurrency: mergeQueueConcurrency,
maxSize: defaultQueueMaxSize,
maxConcurrency: mergeQueueConcurrency,
// TODO(ajwerner): Sometimes the merge queue needs to send multiple
// snapshots, but the timeout function here is configured based on the
// duration required to send a single snapshot. That being said, this
// timeout provides leeway for snapshots to be 10x slower than the
// specified rate and still respects the queue processing minimum timeout.
// While using the below function is certainly better than just using the
// default timeout, it would be better to have a function which takes into
// account how many snapshots processing will need to send. That might be
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
61 changes: 57 additions & 4 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/causer"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand All @@ -44,10 +47,56 @@ const (
defaultQueueMaxSize = 10000
)

func defaultProcessTimeoutFunc(replicaInQueue) time.Duration {
return defaultProcessTimeout
// queueGuaranteedProcessingTimeBudget is the smallest amount of time before
// which the processing of a queue may time out. It is an escape hatch to raise
// the timeout for queues.
var queueGuaranteedProcessingTimeBudget = settings.RegisterDurationSetting(
"kv.queue.process.guaranteed_time_budget",
"the guaranteed duration before which the processing of a queue may "+
"time out",
defaultProcessTimeout,
)

func init() {
queueGuaranteedProcessingTimeBudget.SetConfidential()
}

func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Duration {
return queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
}

// The queues which send snapshots while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
//
// The parameter controls which rate to use.
func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats })
if !ok {
return minimumTimeout
}
snapshotRate := rateSetting.Get(&cs.SV)
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
timeout := estimatedDuration * permittedSnapshotSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
}
return timeout
}
}

// permittedSnapshotSlowdown is the factor of the above the estimated duration
// for a snapshot given the configured snapshot rate which we use to configure
// the snapshot's timeout.
const permittedSnapshotSlowdown = 10

// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
// failure condition changes.
Expand Down Expand Up @@ -222,6 +271,10 @@ type queueImpl interface {
purgatoryChan() <-chan time.Time
}

// queueProcessTimeoutFunc controls the timeout for queue processing for a
// replicaInQueue.
type queueProcessTimeoutFunc func(*cluster.Settings, replicaInQueue) time.Duration

type queueConfig struct {
// maxSize is the maximum number of replicas to queue.
maxSize int
Expand Down Expand Up @@ -258,7 +311,7 @@ type queueConfig struct {
// that have been destroyed but not GCed.
processDestroyedReplicas bool
// processTimeout returns the timeout for processing a replica.
processTimeoutFunc func(replicaInQueue) time.Duration
processTimeoutFunc queueProcessTimeoutFunc
// successes is a counter of replicas processed successfully.
successes *metric.Counter
// failures is a counter of replicas which failed processing.
Expand Down Expand Up @@ -855,7 +908,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name)
defer span.Finish()
return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()),
bq.processTimeoutFunc(repl), func(ctx context.Context) error {
bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error {
log.VEventf(ctx, 1, "processing replica")

if !repl.IsInitialized() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -30,8 +31,8 @@ import (
"golang.org/x/sync/errgroup"
)

func constantTimeoutFunc(d time.Duration) func(replicaInQueue) time.Duration {
return func(replicaInQueue) time.Duration { return d }
func constantTimeoutFunc(d time.Duration) func(*cluster.Settings, replicaInQueue) time.Duration {
return func(*cluster.Settings, replicaInQueue) time.Duration { return d }
}

// TestBaseQueueConcurrent verifies that under concurrent adds/removes of ranges
Expand Down
62 changes: 61 additions & 1 deletion pkg/storage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -886,6 +887,65 @@ func TestBaseQueueProcessTimeout(t *testing.T) {
})
}

type mvccStatsReplicaInQueue struct {
replicaInQueue
size int64
}

func (r mvccStatsReplicaInQueue) GetMVCCStats() enginepb.MVCCStats {
return enginepb.MVCCStats{ValBytes: r.size}
}

func TestQueueSnapshotTimeoutFunc(t *testing.T) {
defer leaktest.AfterTest(t)()
type testCase struct {
guaranteedProcessingTime time.Duration
snapshotRate int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(&st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(&st.SV, tc.snapshotRate)
tf := makeQueueSnapshotTimeoutFunc(recoverySnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
require.Equal(t, tc.expectedTimeout, tf(st, repl))
}
}
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
},
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedSnapshotSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
snapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
},
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 10,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedSnapshotSlowdown,
},
} {
t.Run(makeTest(tc))
}
}

// processTimeQueueImpl spends 5ms on each process request.
type processTimeQueueImpl struct {
testQueueImpl
Expand Down Expand Up @@ -930,7 +990,7 @@ func TestBaseQueueTimeMetric(t *testing.T) {
if v := bq.successes.Count(); v != 1 {
return errors.Errorf("expected 1 processed replicas; got %d", v)
}
if min, v := bq.queueConfig.processTimeoutFunc(nil), bq.processingNanos.Count(); v < min.Nanoseconds() {
if min, v := bq.queueConfig.processTimeoutFunc(nil, nil), bq.processingNanos.Count(); v < min.Nanoseconds() {
return errors.Errorf("expected >= %s in processing time; got %s", min, time.Duration(v))
}
return nil
Expand Down
53 changes: 5 additions & 48 deletions pkg/storage/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -33,24 +32,6 @@ const (
raftSnapshotPriority float64 = 0
)

// raftSnapshotQueueMinimumTimeout is the minimum duration after which the
// processing of the raft snapshot queue will time out. See the timeoutFunc
// in newRaftSnapshotQueue.
var raftSnapshotQueueMinimumTimeout = settings.RegisterDurationSetting(
// NB: this setting has a relatively awkward name because the linter does not
// permit `minimum_timeout` but rather asks for it to be `.minimum.timeout`.
"kv.raft_snapshot_queue.process.timeout_minimum",
"minimum duration after which the processing of the raft snapshot queue will "+
"time out; it is an escape hatch to raise the minimum timeout for sending "+
"a raft snapshot which is sent much more slowly than the allowable rate "+
"as specified by kv.snapshot_recovery.max_rate",
defaultProcessTimeout,
)

func init() {
raftSnapshotQueueMinimumTimeout.SetConfidential()
}

// raftSnapshotQueue manages a queue of replicas which may need to catch a
// replica up with a snapshot to their range.
type raftSnapshotQueue struct {
Expand All @@ -70,35 +51,11 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
// Create a timeout which is a function of the size of the range and the
// maximum allowed rate of data transfer that adheres to a minimum timeout
// specified in a cluster setting.
processTimeoutFunc: func(r replicaInQueue) (d time.Duration) {
minimumTimeout := raftSnapshotQueueMinimumTimeout.Get(&store.ClusterSettings().SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(*Replica)
if !ok {
return minimumTimeout
}
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
snapshotRecoveryRate := recoverySnapshotRate.Get(&store.ClusterSettings().SV)
estimatedDuration := time.Duration(totalBytes / snapshotRecoveryRate)
// Set a timeout to 1/10th of the allowed throughput.
const permittedSlowdown = 10
timeout := estimatedDuration * permittedSlowdown

if timeout < minimumTimeout {
timeout = minimumTimeout
}
return timeout
},
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos,
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(recoverySnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos,
},
)
return rq
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,16 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit,
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
purgatory: store.metrics.ReplicateQueuePurgatory,
// The processing of the replicate queue often needs to send snapshots
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
purgatory: store.metrics.ReplicateQueuePurgatory,
},
)

Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func (sr *StoreRebalancer) rebalanceStore(

log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load",
replWithStats.repl.RangeID, replWithStats.qps, target.StoreID)
if err := contextutil.RunWithTimeout(ctx, "transfer lease", sr.rq.processTimeoutFunc(replWithStats.repl), func(ctx context.Context) error {
timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error {
return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps)
}); err != nil {
log.Errorf(ctx, "unable to transfer lease to s%d: %+v", target.StoreID, err)
Expand Down Expand Up @@ -303,7 +304,8 @@ func (sr *StoreRebalancer) rebalanceStore(
descBeforeRebalance := replWithStats.repl.Desc()
log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load",
replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas(), targets)
if err := contextutil.RunWithTimeout(ctx, "relocate range", sr.rq.processTimeoutFunc(replWithStats.repl), func(ctx context.Context) error {
timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "relocate range", timeout, func(ctx context.Context) error {
return sr.rq.store.AdminRelocateRange(ctx, *descBeforeRebalance, targets)
}); err != nil {
log.Errorf(ctx, "unable to relocate range to %v: %+v", targets, err)
Expand Down

0 comments on commit 519eba8

Please sign in to comment.