Skip to content

Commit

Permalink
kvflow{handle,tokentracker,simulator}: implement kvflowcontrol.Handle
Browse files Browse the repository at this point in the history
Part of #95563.

kvflowcontrol.Handle is used to interface with replication flow control;
it's typically backed by a node-level kvflowcontrol.Controller. Handles
are held on replicas initiating replication traffic, i.e. are both the
leaseholder and raft leader, and manage multiple streams underneath
(typically one per active member of the raft group).

When replicating log entries, these replicas choose the log position
(term+index) the data is to end up at, and use this handle to track the
token deductions on a per log position basis. When informed of admitted
log entries on the receiving end of the stream, we free up tokens by
specifying the highest log position up to which we've admitted
(below-raft admission, for a given priority, takes log position into
account -- see kvflowcontrolpb.AdmittedRaftLogEntries for more details).

We also extend the testing framework introduced in #95905 to also
support writing tests for kvflowcontrol.Handle -- it's now pulled into
its own kvflowsimulator package.

Release note: None
  • Loading branch information
irfansharif committed Feb 11, 2023
1 parent 9ea0ef2 commit 93aa058
Show file tree
Hide file tree
Showing 26 changed files with 2,769 additions and 618 deletions.
9 changes: 9 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ ALL_TESTS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
"//pkg/kv/kvserver/liveness:liveness_test",
"//pkg/kv/kvserver/logstore:logstore_test",
Expand Down Expand Up @@ -1225,6 +1227,10 @@ GO_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:kvflowcontroller_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:kvflowhandle",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:kvflowsimulator_test",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:kvflowtokentracker_test",
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
Expand Down Expand Up @@ -2629,6 +2635,9 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontroller:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowhandle:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowsimulator:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/kvstorage:get_x_data",
Expand Down
41 changes: 38 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,26 @@ package kvflowcontrol
// stream is set to the raft log position of the command removing the replica,
// so stale AdmittedRaftLogEntries messages can be discarded.
//
// I11. What happens when a node is restarted and is being caught up rapidly
// through raft log appends? We know of cases where the initial log appends
// and subsequent state machine application be large enough to invert the
// LSM[^9]. Imagine large block writes with a uniform key distribution; we
// may persist log entries rapidly across many replicas (without inverting
// the LSM, so follower pausing is also of no help) and during state
// application, create lots of overlapping files/sublevels in L0.
// - We want to pace the initial rate of log appends while factoring in the
// effect of the subsequent state machine application on L0 (modulo [^9]). We
// can use flow tokens for this too. In I3a we outlined how for quorum writes
// that includes a replica on some recently re-started node, we need to wait
// for it to be sufficiently caught before deducting/blocking for flow tokens.
// Until that point we can use flow tokens on sender nodes that wish to send
// MsgApps to the newly-restarted node. Similar to the steady state, flow
// tokens will only be returned once the entries are logically admitted (which
// takes into account any apply-time write amplification, modulo [^9]). Once
// the node is sufficiently caught up with respect to all its raft logs, it
// can transition into the mode described in I3a where we deduct/block for
// flow tokens for subsequent quorum writes.
//
// ---
//
// [^1]: kvserverpb.RaftMessageRequest is the unit of what's sent
Expand All @@ -315,9 +335,8 @@ package kvflowcontrol
// [^2]: Over which we're dispatching kvflowcontrolpb.AdmittedRaftLogEntries.
// [^3]: kvflowcontrol.DispatchReader implementations do this as part of
// PendingDispatchFor.
// [^4]: Using DeductedTokensUpto + ReturnAllTokensUpto on
// kvflowcontrol.Handler.
// [^5]: Using ReturnAllTokensUpto on kvflowcontrol.Handler.
// [^4]: Using DisconnectStream on kvflowcontrol.Handler.
// [^5]: Using ConnectStream on kvflowcontrol.Handler.
// [^6]: DeductTokens on kvflowcontrol.Controller returns whether the deduction
// was done.
// [^7]: When a node is crashed, instead of ignoring the underlying flow token
Expand All @@ -331,6 +350,22 @@ package kvflowcontrol
// Admit(), or (ii) don't DeductTokens (Admit() is rendered a no-op),
// we're being somewhat optimistic, which is fine.
// [^8]: Using ReturnTokensUpto on kvflowcontrol.Handle.
// [^9]: With async raft storage writes, there's no interleaving of raft log
// appends and state machine applications. So we could append at a higher
// rate than applying. Since application can be arbitrarily deferred, we
// cause severe LSM inversions. Do we want some form of pacing of log
// appends then, relative to observed state machine application?
// Perhaps specifically in cases where we're more likely to append faster
// than apply, like node restarts. We're likely to defeat AC's IO control
// otherwise.
// - For what it's worth, this "deferred application with high read-amp"
// was also a problem before async raft storage writes. Consider many
// replicas on an LSM, all of which appended a few raft log entries
// without applying, and at apply time across all those replicas, we end
// up inverting the LSM.
// - Since we don't want to wait below raft, one way bound the lag between
// appended entries and applied ones is to only release flow tokens for
// an entry at position P once the applied state position >= P - delta.
//
// TODO(irfansharif): These descriptions are too high-level, imprecise and
// possibly wrong. Fix that. After implementing these interfaces and integrating
Expand Down
67 changes: 44 additions & 23 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type Stream struct {
StoreID roachpb.StoreID
}

// ConnectedStream models a stream over which we're actively replicating data
// traffic. The embedded channel is signaled when the stream is disconnected,
// for example when (i) the remote node has crashed, (ii) bidirectional gRPC
// streams break, (iii) we've paused replication traffic to it, (iv) truncated
// our raft log ahead it, and more. Whenever that happens, we unblock inflight
// requests waiting for flow tokens.
type ConnectedStream interface {
Stream() Stream
Disconnected() <-chan struct{}
}

// Tokens represent the finite capacity of a given stream, expressed in bytes
// for data we're looking to replicate. Use of replication streams are
// predicated on tokens being available.
Expand All @@ -46,12 +57,13 @@ type Tokens int64
type Controller interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority, create-time, and over the given stream. This
// blocks until there are flow tokens available.
Admit(context.Context, admissionpb.WorkPriority, time.Time, Stream) error
// blocks until there are flow tokens available or the stream disconnects,
// subject to context cancellation.
Admit(context.Context, admissionpb.WorkPriority, time.Time, ConnectedStream) error
// DeductTokens deducts (without blocking) flow tokens for replicating work
// with given priority over the given stream. Requests are expected to
// have been Admit()-ed first.
DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream) (deducted bool)
DeductTokens(context.Context, admissionpb.WorkPriority, Tokens, Stream)
// ReturnTokens returns flow tokens for the given stream. These tokens are
// expected to have been deducted earlier with the same priority provided
// here.
Expand All @@ -67,7 +79,8 @@ type Controller interface {
// Handle is used to interface with replication flow control; it's typically
// backed by a node-level kvflowcontrol.Controller. Handles are held on replicas
// initiating replication traffic, i.e. are both the leaseholder and raft
// leader, and manage multiple Streams (one per active replica) underneath.
// leader, and manage multiple streams underneath (typically one per active
// member of the raft group).
//
// When replicating log entries, these replicas choose the log position
// (term+index) the data is to end up at, and use this handle to track the token
Expand All @@ -79,34 +92,42 @@ type Controller interface {
type Handle interface {
// Admit seeks admission to replicate data, regardless of size, for work
// with the given priority and create-time. This blocks until there are
// flow tokens available.
Admit(context.Context, admissionpb.WorkPriority, time.Time)
// flow tokens available for all connected streams.
Admit(context.Context, admissionpb.WorkPriority, time.Time) error
// DeductTokensFor deducts (without blocking) flow tokens for replicating
// work with given priority to members of the raft group. The deduction,
// work with given priority along connected streams. The deduction,
// if successful, is tracked with respect to the specific raft log position
// it's expecting it to end up in. Requests are assumed to have been
// Admit()-ed first.
DeductTokensFor(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
// DeductedTokensUpto returns the highest log position for which we've
// deducted flow tokens for, over the given stream.
DeductedTokensUpto(context.Context, Stream) kvflowcontrolpb.RaftLogPosition
// ReturnTokensUpto returns all previously deducted tokens of a given
// priority for all log positions less than or equal to the one specified.
// It does for the specific stream. Once returned, subsequent attempts to
// return tokens upto the same position or lower are no-ops.
// return tokens upto the same position or lower are no-ops. It's used when
// entries at specific log positions have been admitted below-raft.
//
// NB: Another use is during successive lease changes (out and back) within
// the same raft term -- we want to both free up tokens from when we lost
// the lease, and also ensure we discard attempts to return them (on hearing
// about AdmittedRaftLogEntries replicated under the earlier lease).
ReturnTokensUpto(context.Context, admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
// ReturnAllTokensUpto is like ReturnTokensUpto but does so across all
// priorities.
// ConnectStream connects a stream (typically pointing to an active member
// of the raft group) to the handle. Subsequent calls to Admit() will block
// until flow tokens are available for the stream, or for it to be
// disconnected via DisconnectStream. DeductTokensFor will also deduct
// tokens for all connected streams.
ConnectStream(context.Context, kvflowcontrolpb.RaftLogPosition, Stream)
// DisconnectStream disconnects a stream from the handle. When disconnecting
// a stream, all previously held flow tokens are released and we unblock all
// requests waiting in Admit() for this stream's flow tokens in particular.
//
// NB: This is used when a replica on the other end of a stream gets caught
// up via snapshot (say, after a log truncation), where we then don't expect
// dispatches for the individual AdmittedRaftLogEntries between what it
// admitted last and its latest RaftLogPosition. Another use is during
// successive lease changes (out and back) within the same raft term -- we
// want to both free up tokens from when we lost the lease, and also ensure
// that attempts to return them (on hearing about AdmittedRaftLogEntries
// replicated under the earlier lease), we discard the attempts.
ReturnAllTokensUpto(context.Context, kvflowcontrolpb.RaftLogPosition, Stream)
// NB: This is typically used when we're no longer replicating data to a
// member of the raft group, because it's crashed, no longer part of the
// raft group, we've decided to pause it, we've truncated the raft log ahead
// of it and expect it to be caught up via snapshot, etc. In all these cases
// we don't expect dispatches for individual AdmittedRaftLogEntries between
// what it admitted last and its latest RaftLogPosition.
DisconnectStream(context.Context, Stream)
// Close closes the handle and returns all held tokens back to the
// underlying controller. Typically used when the replica loses its lease
// and/or raft leadership, or ends up getting GC-ed (if it's being
Expand Down Expand Up @@ -134,7 +155,7 @@ type DispatchWriter interface {
// piggybacking) has not taken place.
//
// NB: PendingDispatchFor is expected to remove dispatches from the pending
// list. If the GRPC stream we're sending it over happens to break, we drop
// list. If the gRPC stream we're sending it over happens to break, we drop
// these dispatches. The node waiting these dispatches is expected to react to
// the stream breaking by freeing up all held tokens.
type DispatchReader interface {
Expand Down
10 changes: 1 addition & 9 deletions pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,21 @@ go_library(

go_test(
name = "kvflowcontroller_test",
srcs = [
"kvflowcontrol_token_adjustment_test.go",
"kvflowcontroller_simulation_test.go",
],
srcs = ["kvflowcontrol_token_adjustment_test.go"],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":kvflowcontroller"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/testutils/datapathutils",
"//pkg/util/admission/admissionpb",
"//pkg/util/asciitsdb",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_guptarohit_asciigraph//:asciigraph",
"@com_github_mkungla_bexp_v3//:bexp",
"@com_github_stretchr_testify//require",
],
)
Expand Down
Loading

0 comments on commit 93aa058

Please sign in to comment.