Skip to content

Commit

Permalink
kvflow{handle,tokentracker,simulator}: implement kvflowcontrol.Handle
Browse files Browse the repository at this point in the history
Part of #95563.

kvflowcontrol.Handle is used to interface with replication flow control;
it's typically backed by a node-level kvflowcontrol.Controller. Handles
are held on replicas initiating replication traffic, i.e. are both the
leaseholder and raft leader, and manage multiple streams underneath
(typically one per active member of the raft group).

When replicating log entries, these replicas choose the log position
(term+index) the data is to end up at, and use this handle to track the
token deductions on a per log position basis. When informed of admitted
log entries on the receiving end of the stream, we free up tokens by
specifying the highest log position up to which we've admitted
(below-raft admission, for a given priority, takes log position into
account -- see kvflowcontrolpb.AdmittedRaftLogEntries for more details).

We also extend the testing framework introduced in #95905 to also
support writing tests for kvflowcontrol.Handle -- it's now pulled into
its own kvflowsimulator package. We're able to write tests that look
like the following:

    # Set up a triply connected handle (to s1, s2, s3) and start issuing
    # writes at 1MiB/s. For two of the streams, return tokens at exactly
    # the rate its being deducted (1MiB/s). For the third stream (s3),
    # we return flow tokens at only 0.5MiB/s.
    timeline
    t=0s         handle=h op=connect    stream=t1/s1   log-position=1/0
    t=0s         handle=h op=connect    stream=t1/s2   log-position=1/0
    t=0s         handle=h op=connect    stream=t1/s3   log-position=1/0
    t=[0s,50s)   handle=h class=regular adjust=-1MiB/s   rate=10/s
    t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s1
    t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s2
    t=[0.2s,50s) handle=h class=regular adjust=+0.5MiB/s rate=10/s stream=t1/s3
    ----

    # Observe:
    # - Total available tokens flatlines at 32MiB since flow tokens for
    #   s3 eventually depletes and later bounces off of 0MiB. We
    #   initially have 3*16MiB = 48MiB worth of flow tokens, and end
    #   up at 48MiB-16MiB = 32MiB.
    # - Initially the rate of token deductions (3*1MiB/s = 3MiB/s) is
    #   higher than the token returns (1MiB/s+1MiB/s+0.5MiB/s =
    #   2.5MiB/s), but after we start shaping it to the slowest
    #   stream, they end up matching at (0.5MiB/s*3 = 1.5MiB/s).
    # - The blocked stream count bounces between 0 and 1 as the s3
    #   stream gets blocked/unblocked as tokens are
    #   deducted/returned. The demand for tokens (1MiB/s) is higher
    #   than the replenishment rate (0.5MiB/s).
    # - The overall admission rate is reduced from 30 reqs/s to 25
    #   reqs/s, mapping to token deduction rate of 3MiB/s to 2.5MiB/s
    #   (1MiB/s divvied across 10 reqs). The difference between 30
    #   reqs/s and 25 reqs/s is found in the +5 reqs/s accumulating in
    #   the wait queue.
    plot
    kvadmission.flow_controller.regular_tokens_available            unit=MiB
    kvadmission.flow_controller.regular_tokens_{deducted,returned}  unit=MiB/s rate=true
    kvadmission.flow_controller.regular_blocked_stream_count        unit=streams
    kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true
    ----
    ----
     47.7 ┼╮
     46.6 ┤╰─╮
     45.6 ┤  ╰─╮
     44.5 ┤    ╰╮
     43.5 ┤     ╰─╮
     42.4 ┤       ╰╮
     41.4 ┤        ╰─╮
     40.3 ┤          ╰─╮
     39.3 ┤            ╰╮
     38.2 ┤             ╰─╮
     37.2 ┤               ╰─╮
     36.1 ┤                 ╰╮
     35.1 ┤                  ╰─╮
     34.0 ┤                    ╰─╮
     33.0 ┤                      ╰╮
     31.9 ┤                       ╰───────────────
                regular_tokens_available (MiB)

     3.0 ┤╭───────────────────────╮
     2.8 ┤│                       │
     2.6 ┤╭────────────────────────╮
     2.4 ┤│                       ╰│
     2.2 ┤│                        │
     2.0 ┤│                        │
     1.8 ┤│                        │
     1.6 ┤│                        ╰─────────────
     1.4 ┤│
     1.2 ┤│
     1.0 ┤│
     0.8 ┤│
     0.6 ┤│
     0.4 ┤│
     0.2 ┤│
     0.0 ┼╯
          rate(regular_tokens_{deducted,returned}) (MiB/s)

     1.0 ┤                                 ╭╮   ╭
     0.9 ┤                            ╭╮   ││   │
     0.9 ┤                            ││   ││   │
     0.8 ┤                            ││   ││   │
     0.7 ┤                            ││   ││   │
     0.7 ┤                         ╭╮ ││╭╮ ││   │
     0.6 ┤                         ││ ││││ ││╭─╮│
     0.5 ┤                         │╰╮│││╰╮│││ ││
     0.5 ┤                         │ ││││ ││││ ││
     0.4 ┤                         │ ││││ ││││ ││
     0.3 ┤                         │ ││││ ││││ ││
     0.3 ┤                         │ ╰╯││ ││││ ││
     0.2 ┤                         │   ││ ╰╯╰╯ ╰╯
     0.1 ┤                        ╭╯   ╰╯
     0.1 ┤                        │
     0.0 ┼────────────────────────╯
           regular_blocked_stream_count (streams)

     30.0 ┤╭───────────────────────╮
     28.0 ┤│                       ╰╮
     26.0 ┤│                        ╰─────────────
     24.0 ┤│
     22.0 ┤│
     20.0 ┤│
     18.0 ┤│
     16.0 ┤│
     14.0 ┤│
     12.0 ┤│
     10.0 ┤│
      8.0 ┤│
      6.0 ┤│                        ╭─────────────
      4.0 ┤│                        │
      2.0 ┤│                       ╭╯
      0.0 ┼────────────────────────╯
           rate(regular_requests_{admitted,waiting}) (reqs/s)
    ----
    ----

Release note: None
  • Loading branch information
irfansharif committed Feb 17, 2023
1 parent 51be9f0 commit fb92a4f
Show file tree
Hide file tree
Showing 30 changed files with 3,181 additions and 623 deletions.
11 changes: 11 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ ALL_TESTS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/liveness:liveness_test",
"//pkg/kv/kvserver/logstore:logstore_test",
Expand Down Expand Up @@ -1234,6 +1237,11 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
Expand Down Expand Up @@ -2643,6 +2651,9 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/kvstorage:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ go_library(
srcs = [
"doc.go",
"kvflowcontrol.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
Expand Down
48 changes: 45 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,26 @@ package kvflowcontrol
// stream is set to the raft log position of the command removing the replica,
// so stale AdmittedRaftLogEntries messages can be discarded.
//
// I11. What happens when a node is restarted and is being caught up rapidly
// through raft log appends? We know of cases where the initial log appends
// and subsequent state machine application be large enough to invert the
// LSM[^9]. Imagine large block writes with a uniform key distribution; we
// may persist log entries rapidly across many replicas (without inverting
// the LSM, so follower pausing is also of no help) and during state
// machine application, create lots of overlapping files/sublevels in L0.
// - We want to pace the initial rate of log appends while factoring in the
// effect of the subsequent state machine application on L0 (modulo [^9]). We
// can use flow tokens for this too. In I3a we outlined how for quorum writes
// that includes a replica on some recently re-started node, we need to wait
// for it to be sufficiently caught before deducting/blocking for flow tokens.
// Until that point we can use flow tokens on sender nodes that wish to send
// catchup MsgApps to the newly-restarted node. Similar to the steady state,
// flow tokens are only be returned once log entries are logically admitted
// (which takes into account any apply-time write amplification, modulo [^9]).
// Once the node is sufficiently caught up with respect to all its raft logs,
// it can transition into the mode described in I3a where we deduct/block for
// flow tokens for subsequent quorum writes.
//
// ---
//
// [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent
Expand All @@ -315,9 +335,8 @@ package kvflowcontrol
// [^2]: Over which we're dispatching kvflowcontrolpb.AdmittedRaftLogEntries.
// [^3]: kvflowcontrol.DispatchReader implementations do this as part of
// PendingDispatchFor.
// [^4]: Using DeductedTokensUpto + ReturnAllTokensUpto on
// kvflowcontrol.Handler.
// [^5]: Using ReturnAllTokensUpto on kvflowcontrol.Handler.
// [^4]: Using DisconnectStream on kvflowcontrol.Handler.
// [^5]: Using ConnectStream on kvflowcontrol.Handler.
// [^6]: DeductTokens on kvflowcontrol.Controller returns whether the deduction
// was done.
// [^7]: When a node is crashed, instead of ignoring the underlying flow token
Expand All @@ -331,6 +350,29 @@ package kvflowcontrol
// Admit(), or (ii) don't DeductTokens (Admit() is rendered a no-op),
// we're being somewhat optimistic, which is fine.
// [^8]: Using ReturnTokensUpto on kvflowcontrol.Handle.
// [^9]: With async raft storage writes (#17500, etcd-io/raft#8), we can
// decouple raft log appends and state machine application (see #94854 and
// #94853). So we could append at a higher rate than applying. Since
// application can be arbitrarily deferred, we cause severe LSM
// inversions. Do we want some form of pacing of log appends then,
// relative to observed state machine application? Perhaps specifically in
// cases where we're more likely to append faster than apply, like node
// restarts. We're likely to defeat AC's IO control otherwise.
// - For what it's worth, this "deferred application with high read-amp"
// was also a problem before async raft storage writes. Consider many
// replicas on an LSM, all of which appended a few raft log entries
// without applying, and at apply time across all those replicas, we end
// up inverting the LSM.
// - Since we don't want to wait below raft, one way bound the lag between
// appended entries and applied ones is to only release flow tokens for
// an entry at position P once the applied state position >= P - delta.
// We'd have to be careful, if we're not applying due to quorum loss
// (as a result of remote node failure(s)), we don't want to deplete
// flow tokens and cause interference on other ranges.
// - If this all proves too complicated, we could just not let state
// machine application get significantly behind due to local scheduling
// reasons by using the same goroutine to do both async raft log writes
// and state machine application.
//
// TODO(irfansharif): These descriptions are too high-level, imprecise and
// possibly wrong. Fix that. After implementing these interfaces and integrating
Expand Down
76 changes: 50 additions & 26 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type Stream struct {
StoreID roachpb.StoreID
}

// ConnectedStream models a stream over which we're actively replicating data
// traffic. The embedded channel is signaled when the stream is disconnected,
// for example when (i) the remote node has crashed, (ii) bidirectional gRPC
// streams break, (iii) we've paused replication traffic to it, (iv) truncated
// our raft log ahead it, and more. Whenever that happens, we unblock inflight
// requests waiting for flow tokens.
type ConnectedStream interface {
Stream() Stream
Disconnected() <-chan struct{}
}

// Tokens represent the finite capacity of a given stream, expressed in bytes
// for data we're looking to replicate. Use of replication streams are
// predicated on tokens being available.
Expand All @@ -46,12 +57,13 @@ type Tokens int64
type Controller interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority, create-time, and over the given stream. This
// blocks until there are flow tokens available.
Admit(context.Context, admissionpb.WorkPriority, time.Time, Stream) error
// blocks until there are flow tokens available or the stream disconnects,
// subject to context cancellation.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
// DeductTokens deducts (without blocking) flow tokens for replicating work
// with given priority over the given stream. Requests are expected to
// have been Admit()-ed first.
DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) (deducted bool)
DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream)
// ReturnTokens returns flow tokens for the given stream. These tokens are
// expected to have been deducted earlier with the same priority provided
// here.
Expand All @@ -67,7 +79,8 @@ type Controller interface {
// Handle is used to interface with replication flow control; it's typically
// backed by a node-level kvflowcontrol.Controller. Handles are held on replicas
// initiating replication traffic, i.e. are both the leaseholder and raft
// leader, and manage multiple Streams (one per active replica) underneath.
// leader, and manage multiple streams underneath (typically one per active
// member of the raft group).
//
// When replicating log entries, these replicas choose the log position
// (term+index) the data is to end up at, and use this handle to track the token
Expand All @@ -79,34 +92,45 @@ type Controller interface {
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are
// flow tokens available.
Admit(context.Context, admissionpb.WorkPriority, time.Time)
// flow tokens available for all connected streams.
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority to members of the raft group. The deduction,
// if successful, is tracked with respect to the specific raft log position
// it's expecting it to end up in. Requests are assumed to have been
// Admit()-ed first.
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
// to end up in, log positions that monotonically increase. Requests are
// assumed to have been Admit()-ed first.
DeductTokensFor(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
// DeductedTokensUpto returns the highest log position for which we've
// deducted flow tokens for, over the given stream.
DeductedTokensUpto(context.Context, Stream) kvflowcontrolpb.RaftLogPosition
// ReturnTokensUpto returns all previously deducted tokens of a given
// priority for all log positions less than or equal to the one specified.
// It does for the specific stream. Once returned, subsequent attempts to
// return tokens upto the same position or lower are no-ops.
// return tokens upto the same position or lower are no-ops. It's used when
// entries at specific log positions have been admitted below-raft.
//
// NB: Another use is during successive lease changes (out and back) within
// the same raft term -- we want to both free up tokens from when we lost
// the lease, and also ensure we discard attempts to return them (on hearing
// about AdmittedRaftLogEntries replicated under the earlier lease).
ReturnTokensUpto(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
// ReturnAllTokensUpto is like ReturnTokensUpto but does so across all
// priorities.
// ConnectStream connects a stream (typically pointing to an active member
// of the raft group) to the handle. Subsequent calls to Admit() will block
// until flow tokens are available for the stream, or for it to be
// disconnected via DisconnectStream. DeductTokensFor will also deduct
// tokens for all connected streams. The log position is used as a lower
// bound, beneath which all token deductions/returns are rendered no-ops.
ConnectStream(context.Context, kvflowcontrolpb.RaftLogPosition, Stream)
// DisconnectStream disconnects a stream from the handle. When disconnecting
// a stream, (a) all previously held flow tokens are released and (b) we
// unblock all requests waiting in Admit() for this stream's flow tokens in
// particular.
//
// NB: This is used when a replica on the other end of a stream gets caught
// up via snapshot (say, after a log truncation), where we then don't expect
// dispatches for the individual AdmittedRaftLogEntries between what it
// admitted last and its latest RaftLogPosition. Another use is during
// successive lease changes (out and back) within the same raft term -- we
// want to both free up tokens from when we lost the lease, and also ensure
// that attempts to return them (on hearing about AdmittedRaftLogEntries
// replicated under the earlier lease), we discard the attempts.
ReturnAllTokensUpto(context.Context, kvflowcontrolpb.RaftLogPosition, Stream)
// This is typically used when we're no longer replicating data to a member
// of the raft group, because (a) it crashed, (b) it's no longer part of the
// raft group, (c) we've decided to pause it, (d) we've truncated the raft
// log ahead of it and expect it to be caught up via snapshot, and more. In
// all these cases we don't expect dispatches for individual
// AdmittedRaftLogEntries between what it admitted last and its latest
// RaftLogPosition.
DisconnectStream(context.Context, Stream)
// Close closes the handle and returns all held tokens back to the
// underlying controller. Typically used when the replica loses its lease
// and/or raft leadership, or ends up getting GC-ed (if it's being
Expand Down Expand Up @@ -134,7 +158,7 @@ type DispatchWriter interface {
// piggybacking) has not taken place.
//
// NB: PendingDispatchFor is expected to remove dispatches from the pending
// list. If the GRPC stream we're sending it over happens to break, we drop
// list. If the gRPC stream we're sending it over happens to break, we drop
// these dispatches. The node waiting these dispatches is expected to react to
// the stream breaking by freeing up all held tokens.
type DispatchReader interface {
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,21 @@ go_library(

go_test(
name = "kvflowcontroller_test",
srcs = [
"kvflowcontrol_token_adjustment_test.go",
"kvflowcontroller_simulation_test.go",
],
srcs = ["kvflowcontrol_token_adjustment_test.go"],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":kvflowcontroller"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/asciitsdb",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_github_mkungla_bexp_v3//:bexp",
"@com_github_stretchr_testify//require",
],
)
Expand Down
Loading

0 comments on commit fb92a4f

Please sign in to comment.