Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvflowhandle: implement kvflowcontrol.Handle #96642

Merged
merged 1 commit into from
Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ ALL_TESTS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/liveness:liveness_test",
"//pkg/kv/kvserver/logstore:logstore_test",
Expand Down Expand Up @@ -1234,6 +1237,11 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
Expand Down Expand Up @@ -2643,6 +2651,9 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/kvstorage:get_x_data",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ go_library(
srcs = [
"doc.go",
"kvflowcontrol.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
Expand Down
48 changes: 45 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,26 @@ package kvflowcontrol
// stream is set to the raft log position of the command removing the replica,
// so stale AdmittedRaftLogEntries messages can be discarded.
//
// I11. What happens when a node is restarted and is being caught up rapidly
// through raft log appends? We know of cases where the initial log appends
// and subsequent state machine application be large enough to invert the
// LSM[^9]. Imagine large block writes with a uniform key distribution; we
// may persist log entries rapidly across many replicas (without inverting
// the LSM, so follower pausing is also of no help) and during state
// machine application, create lots of overlapping files/sublevels in L0.
// - We want to pace the initial rate of log appends while factoring in the
// effect of the subsequent state machine application on L0 (modulo [^9]). We
// can use flow tokens for this too. In I3a we outlined how for quorum writes
// that includes a replica on some recently re-started node, we need to wait
// for it to be sufficiently caught before deducting/blocking for flow tokens.
// Until that point we can use flow tokens on sender nodes that wish to send
// catchup MsgApps to the newly-restarted node. Similar to the steady state,
// flow tokens are only be returned once log entries are logically admitted
// (which takes into account any apply-time write amplification, modulo [^9]).
// Once the node is sufficiently caught up with respect to all its raft logs,
// it can transition into the mode described in I3a where we deduct/block for
// flow tokens for subsequent quorum writes.
//
// ---
//
// [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent
Expand All @@ -315,9 +335,8 @@ package kvflowcontrol
// [^2]: Over which we're dispatching kvflowcontrolpb.AdmittedRaftLogEntries.
// [^3]: kvflowcontrol.DispatchReader implementations do this as part of
// PendingDispatchFor.
// [^4]: Using DeductedTokensUpto + ReturnAllTokensUpto on
// kvflowcontrol.Handler.
// [^5]: Using ReturnAllTokensUpto on kvflowcontrol.Handler.
// [^4]: Using DisconnectStream on kvflowcontrol.Handler.
// [^5]: Using ConnectStream on kvflowcontrol.Handler.
// [^6]: DeductTokens on kvflowcontrol.Controller returns whether the deduction
// was done.
// [^7]: When a node is crashed, instead of ignoring the underlying flow token
Expand All @@ -331,6 +350,29 @@ package kvflowcontrol
// Admit(), or (ii) don't DeductTokens (Admit() is rendered a no-op),
// we're being somewhat optimistic, which is fine.
// [^8]: Using ReturnTokensUpto on kvflowcontrol.Handle.
// [^9]: With async raft storage writes (#17500, etcd-io/raft#8), we can
// decouple raft log appends and state machine application (see #94854 and
// #94853). So we could append at a higher rate than applying. Since
// application can be arbitrarily deferred, we cause severe LSM
// inversions. Do we want some form of pacing of log appends then,
// relative to observed state machine application? Perhaps specifically in
// cases where we're more likely to append faster than apply, like node
// restarts. We're likely to defeat AC's IO control otherwise.
// - For what it's worth, this "deferred application with high read-amp"
// was also a problem before async raft storage writes. Consider many
// replicas on an LSM, all of which appended a few raft log entries
// without applying, and at apply time across all those replicas, we end
// up inverting the LSM.
// - Since we don't want to wait below raft, one way bound the lag between
// appended entries and applied ones is to only release flow tokens for
// an entry at position P once the applied state position >= P - delta.
// We'd have to be careful, if we're not applying due to quorum loss
// (as a result of remote node failure(s)), we don't want to deplete
// flow tokens and cause interference on other ranges.
// - If this all proves too complicated, we could just not let state
// machine application get significantly behind due to local scheduling
// reasons by using the same goroutine to do both async raft log writes
// and state machine application.
//
// TODO(irfansharif): These descriptions are too high-level, imprecise and
// possibly wrong. Fix that. After implementing these interfaces and integrating
Expand Down
76 changes: 50 additions & 26 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type Stream struct {
StoreID roachpb.StoreID
}

// ConnectedStream models a stream over which we're actively replicating data
// traffic. The embedded channel is signaled when the stream is disconnected,
// for example when (i) the remote node has crashed, (ii) bidirectional gRPC
// streams break, (iii) we've paused replication traffic to it, (iv) truncated
// our raft log ahead it, and more. Whenever that happens, we unblock inflight
// requests waiting for flow tokens.
type ConnectedStream interface {
Stream() Stream
Disconnected() <-chan struct{}
}

// Tokens represent the finite capacity of a given stream, expressed in bytes
// for data we're looking to replicate. Use of replication streams are
// predicated on tokens being available.
Expand All @@ -46,12 +57,13 @@ type Tokens int64
type Controller interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority, create-time, and over the given stream. This
// blocks until there are flow tokens available.
Admit(context.Context, admissionpb.WorkPriority, time.Time, Stream) error
// blocks until there are flow tokens available or the stream disconnects,
// subject to context cancellation.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
// DeductTokens deducts (without blocking) flow tokens for replicating work
// with given priority over the given stream. Requests are expected to
// have been Admit()-ed first.
DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) (deducted bool)
DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream)
// ReturnTokens returns flow tokens for the given stream. These tokens are
// expected to have been deducted earlier with the same priority provided
// here.
Expand All @@ -67,7 +79,8 @@ type Controller interface {
// Handle is used to interface with replication flow control; it's typically
// backed by a node-level kvflowcontrol.Controller. Handles are held on replicas
// initiating replication traffic, i.e. are both the leaseholder and raft
// leader, and manage multiple Streams (one per active replica) underneath.
// leader, and manage multiple streams underneath (typically one per active
// member of the raft group).
//
// When replicating log entries, these replicas choose the log position
// (term+index) the data is to end up at, and use this handle to track the token
Expand All @@ -79,34 +92,45 @@ type Controller interface {
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are
// flow tokens available.
Admit(context.Context, admissionpb.WorkPriority, time.Time)
// flow tokens available for all connected streams.
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority to members of the raft group. The deduction,
// if successful, is tracked with respect to the specific raft log position
// it's expecting it to end up in. Requests are assumed to have been
// Admit()-ed first.
// work with given priority along connected streams. The deduction is
// tracked with respect to the specific raft log position it's expecting it
// to end up in, log positions that monotonically increase. Requests are
// assumed to have been Admit()-ed first.
DeductTokensFor(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
// DeductedTokensUpto returns the highest log position for which we've
// deducted flow tokens for, over the given stream.
DeductedTokensUpto(context.Context, Stream) kvflowcontrolpb.RaftLogPosition
// ReturnTokensUpto returns all previously deducted tokens of a given
// priority for all log positions less than or equal to the one specified.
// It does for the specific stream. Once returned, subsequent attempts to
// return tokens upto the same position or lower are no-ops.
// return tokens upto the same position or lower are no-ops. It's used when
// entries at specific log positions have been admitted below-raft.
//
// NB: Another use is during successive lease changes (out and back) within
// the same raft term -- we want to both free up tokens from when we lost
// the lease, and also ensure we discard attempts to return them (on hearing
// about AdmittedRaftLogEntries replicated under the earlier lease).
ReturnTokensUpto(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
// ReturnAllTokensUpto is like ReturnTokensUpto but does so across all
// priorities.
// ConnectStream connects a stream (typically pointing to an active member
// of the raft group) to the handle. Subsequent calls to Admit() will block
// until flow tokens are available for the stream, or for it to be
// disconnected via DisconnectStream. DeductTokensFor will also deduct
// tokens for all connected streams. The log position is used as a lower
// bound, beneath which all token deductions/returns are rendered no-ops.
ConnectStream(context.Context, kvflowcontrolpb.RaftLogPosition, Stream)
// DisconnectStream disconnects a stream from the handle. When disconnecting
// a stream, (a) all previously held flow tokens are released and (b) we
// unblock all requests waiting in Admit() for this stream's flow tokens in
// particular.
//
// NB: This is used when a replica on the other end of a stream gets caught
// up via snapshot (say, after a log truncation), where we then don't expect
// dispatches for the individual AdmittedRaftLogEntries between what it
// admitted last and its latest RaftLogPosition. Another use is during
// successive lease changes (out and back) within the same raft term -- we
// want to both free up tokens from when we lost the lease, and also ensure
// that attempts to return them (on hearing about AdmittedRaftLogEntries
// replicated under the earlier lease), we discard the attempts.
ReturnAllTokensUpto(context.Context, kvflowcontrolpb.RaftLogPosition, Stream)
// This is typically used when we're no longer replicating data to a member
// of the raft group, because (a) it crashed, (b) it's no longer part of the
// raft group, (c) we've decided to pause it, (d) we've truncated the raft
// log ahead of it and expect it to be caught up via snapshot, and more. In
// all these cases we don't expect dispatches for individual
// AdmittedRaftLogEntries between what it admitted last and its latest
// RaftLogPosition.
DisconnectStream(context.Context, Stream)
// Close closes the handle and returns all held tokens back to the
// underlying controller. Typically used when the replica loses its lease
// and/or raft leadership, or ends up getting GC-ed (if it's being
Expand Down Expand Up @@ -134,7 +158,7 @@ type DispatchWriter interface {
// piggybacking) has not taken place.
//
// NB: PendingDispatchFor is expected to remove dispatches from the pending
// list. If the GRPC stream we're sending it over happens to break, we drop
// list. If the gRPC stream we're sending it over happens to break, we drop
// these dispatches. The node waiting these dispatches is expected to react to
// the stream breaking by freeing up all held tokens.
type DispatchReader interface {
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,21 @@ go_library(

go_test(
name = "kvflowcontroller_test",
srcs = [
"kvflowcontrol_token_adjustment_test.go",
"kvflowcontroller_simulation_test.go",
],
srcs = ["kvflowcontrol_token_adjustment_test.go"],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":kvflowcontroller"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/asciitsdb",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_github_mkungla_bexp_v3//:bexp",
"@com_github_stretchr_testify//require",
],
)
Expand Down
Loading