Skip to content

Commit

Permalink
kvserver,kvflowcontrol: integrate flow control
Browse files Browse the repository at this point in the history
Part of cockroachdb#95563. This PR integrates various kvflowcontrol components into
the critical path for replication traffic. It does so by introducing two
"integration interfaces" in the kvserver package to intercept various
points of a replica's lifecycle, using it to manage the underlying
replication streams and flow tokens. The integration is mediated through
two cluster settings:

  - kvadmission.flow_control.enabled
      This is a top-level kill-switch to revert to pre-kvflowcontrol
      behavior where follower writes unilaterally deducted IO tokens
      without blocking.

  - kvadmission.flow_control.mode
      It can take on one of three settings, each exercising the flow
      control machinery to varying degrees.

      - apply_to_elastic
          Only applies admission delays to elastic traffic.

      - apply_to_all
          Applies admission delays to {regular,elastic} traffic.

When the mode is changed, we simply admit all waiting requests. This
risks possibly over-admitting work, but that's ok -- we assume these
mode changes are rare events and done under supervision. These
settings are hooked into in the kvadmission and kvflowcontroller
packages. As for the actual integration interfaces in kvserver, they
are:

  - replicaFlowControlIntegration: used to integrate with replication
    flow control. It's intercepts various points in a replica's
    lifecycle, like it acquiring raft leadership or losing it, or its
    raft membership changing, etc. Accessing it requires Replica.mu to
    be held, exclusively (this is asserted on in the canonical
    implementation).

      type replicaFlowControlIntegration interface {
        handle() (kvflowcontrol.Handle, bool)
        onBecameLeader(context.Context)
        onBecameFollower(context.Context)
        onDescChanged(context.Context)
        onFollowersPaused(context.Context)
        onReplicaDestroyed(context.Context)
        onProposalQuotaUpdated(context.Context)
      }

  - replicaForFlowControl abstracts the interface of an individual
    Replica, as needed by replicaFlowControlIntegration.

      type replicaForFlowControl interface {
        assertLocked()
        annotateCtx(context.Context) context.Context
        getTenantID() roachpb.TenantID
        getReplicaID() roachpb.ReplicaID
        getRangeID() roachpb.RangeID
        getDescriptor() *roachpb.RangeDescriptor
        pausedFollowers() map[roachpb.ReplicaID]struct{}
        isFollowerActive(context.Context, roachpb.ReplicaID) bool
        appliedLogPosition() kvflowcontrolpb.RaftLogPosition
        withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress))
      }

Release note: None
  • Loading branch information
irfansharif committed Jun 6, 2023
1 parent 4241f6b commit 8029f27
Show file tree
Hide file tree
Showing 53 changed files with 1,667 additions and 128 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -300,4 +300,4 @@ trace.opentelemetry.collector string address of an OpenTelemetry trace collecto
trace.snapshot.rate duration 0s if non-zero, interval at which background trace snapshots are captured tenant-rw
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez tenant-rw
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. tenant-rw
version version 1000023.1-8 set the active cluster version in the format '<major>.<minor>' tenant-rw
version version 1000023.1-10 set the active cluster version in the format '<major>.<minor>' tenant-rw
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,6 @@
<tr><td><div id="setting-trace-snapshot-rate" class="anchored"><code>trace.snapshot.rate</code></div></td><td>duration</td><td><code>0s</code></td><td>if non-zero, interval at which background trace snapshots are captured</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-8</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000023.1-10</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
8 changes: 8 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,10 @@ const (
// the system tenant.
V23_2_EnableRangeCoalescingForSystemTenant

// V23_2_UseACRaftEntryEntryEncodings gates the use of raft entry encodings
// that (optionally) embed below-raft admission data.
V23_2_UseACRaftEntryEntryEncodings

// *************************************************
// Step (1) Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -935,6 +939,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_2_EnableRangeCoalescingForSystemTenant,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 8},
},
{
Key: V23_2_UseACRaftEntryEntryEncodings,
Version: roachpb.Version{Major: 23, Minor: 1, Internal: 10},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_library(
"debug_print.go",
"doc.go",
"flow_control_raft_transport.go",
"flow_control_replica.go",
"flow_control_replica_integration.go",
"flow_control_stores.go",
"lease_history.go",
"markers.go",
Expand Down Expand Up @@ -141,6 +143,7 @@ go_library(
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/kvstorage",
Expand Down
107 changes: 107 additions & 0 deletions pkg/kv/kvserver/flow_control_replica.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"go.etcd.io/raft/v3"
rafttracker "go.etcd.io/raft/v3/tracker"
)

// replicaForFlowControl abstracts the interface of an individual Replica, as
// needed by replicaFlowControlIntegration.
type replicaForFlowControl interface {
annotateCtx(context.Context) context.Context
getTenantID() roachpb.TenantID
getReplicaID() roachpb.ReplicaID
getRangeID() roachpb.RangeID
getDescriptor() *roachpb.RangeDescriptor
getAppliedLogPosition() kvflowcontrolpb.RaftLogPosition
getPausedFollowers() map[roachpb.ReplicaID]struct{}
isFollowerLive(context.Context, roachpb.ReplicaID) bool
isRaftTransportConnectedTo(roachpb.StoreID) bool
withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress))

assertLocked() // only affects test builds
}

// replicaFlowControl is a concrete implementation of the replicaForFlowControl
// interface.
type replicaFlowControl Replica

var _ replicaForFlowControl = &replicaFlowControl{}

func (rf *replicaFlowControl) assertLocked() {
rf.mu.AssertHeld()
}

func (rf *replicaFlowControl) annotateCtx(ctx context.Context) context.Context {
return rf.AnnotateCtx(ctx)
}

func (rf *replicaFlowControl) getTenantID() roachpb.TenantID {
rf.assertLocked()
return rf.mu.tenantID
}

func (rf *replicaFlowControl) getReplicaID() roachpb.ReplicaID {
return rf.replicaID
}

func (rf *replicaFlowControl) getRangeID() roachpb.RangeID {
return rf.RangeID
}

func (rf *replicaFlowControl) getDescriptor() *roachpb.RangeDescriptor {
rf.assertLocked()
r := (*Replica)(rf)
return r.descRLocked()
}

func (rf *replicaFlowControl) getPausedFollowers() map[roachpb.ReplicaID]struct{} {
rf.assertLocked()
return rf.mu.pausedFollowers
}

func (rf *replicaFlowControl) isFollowerLive(ctx context.Context, replID roachpb.ReplicaID) bool {
rf.mu.AssertHeld()
return rf.mu.lastUpdateTimes.isFollowerActiveSince(
ctx,
replID,
timeutil.Now(),
rf.store.cfg.RangeLeaseDuration,
)
}

func (rf *replicaFlowControl) isRaftTransportConnectedTo(storeID roachpb.StoreID) bool {
rf.mu.AssertHeld()
return rf.store.cfg.Transport.isConnectedTo(storeID)
}

func (rf *replicaFlowControl) getAppliedLogPosition() kvflowcontrolpb.RaftLogPosition {
rf.mu.AssertHeld()
status := rf.mu.internalRaftGroup.BasicStatus()
return kvflowcontrolpb.RaftLogPosition{
Term: status.Term,
Index: status.Applied,
}
}

func (rf *replicaFlowControl) withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress)) {
rf.mu.AssertHeld()
rf.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress rafttracker.Progress) {
f(roachpb.ReplicaID(id), progress)
})
}
Loading

0 comments on commit 8029f27

Please sign in to comment.