Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: avoid replicating to followers that are I/O overloaded #83851

Merged
merged 2 commits into from
Jul 15, 2022

Conversation

tbg
Copy link
Member

@tbg tbg commented Jul 5, 2022

This commit implements the stop-gap solution to raft traffic contributing to
I/O overload that was discussed1 in #79215.

The newly introduced admission.kv.pause_replication_io_threshold cluster
setting can be used to define an I/O overload score above which raft leaders
will attempt to pause replication to nonessential followers, to given them a
chance at tidying up their LSM.

A follower is "nonessential" if it is either a non-voter, or if we think (and
are fairly certain that) quorum can be reached without it. If there are
multiple overloaded followers, we pick as many as we can without losing quorum,
and we try to do so in a way that's a) random, i.e. different raft leaders
will pick a different set, to give each store a share of the relief, and b)
stable, i.e. each raft leader will pick the same set at least for a little
while, to avoid interrupting a quorum by rapidly switching the set of active
followers (but see here2).

The implementation of this is as follows:

  • on Store, materialize (on each tick) a storeOverloadMap from gossip
  • each Replica, on tick, from this map compute the set of followers to pause,
    and store it in a per-Replica map.
  • consult this latter map
    • when sending MsgApp from handleRaftReady
    • when releasing proposal quota.

This commit by default disables this new functionality by setting the cluster
setting to zero. This has the effect of an empty storeOverloadMap and thus,
after one round of gossip and subsequent tick (i.e. within seconds), an empty
per-Replica paused followers map.

Additionally, it's worth pointing out the mixed-version behavior: the old
nodes' stores will be observed as gossiping a zero IOThreshold, which is
considered not overloaded, i.e. replication streams to old nodes will never be
paused.

Owing to experiments (with an IOThreshold cutoff score of 0.8) , more commits
were added to improve on the impact of snapshots. With just the basic commit
described above, the overloaded store will - at least in the experiment -
manage to control the shape of the LSM, though barely. This is because once
followers are paused, some of the ranges will undergo non-cooperative log
truncations, meaning that the paused follower will be cut off from the log.
This then initiates a steady stream of (raft) snapshots to the follower, and
much of these snapshots ingests into L0, driving up the L0 file count (and
thus penalizing foreground traffic to n3, which is not causing any of this), as
seen here (red line):

image

image

image

On the plus side, even with that, the foreground traffic is not impacted by the
ailing follower. The chart below shows the p50, p99, and p100 for the kv0 workload
whose leases are on the non-throttled nodes n1 and n2. The first annotation is when
n3 gets unthrottled, and the second annotation marks it crashing (out of disk).

image

The first additional commit attempts to alleviate this issue by blocking (raft)
snapshots that are about to be sent to a paused follower. With it, the L0 file
count is under noticeably tighter control:

image

and this is due to the snapshots flowing only when followers are not paused:

image

image

A now-removed additional commit raised the bar for non-cooperative log
truncation to the configured max range size while a paused follower is present.
In theory, this should significantly cut down on the number of replicas needing
snapshots, but in practice the experiments show that whenever the follower
de-loads for a brief moment (as it frequently does in this experiment since the
overload is entirely driven by raft traffic), a non-cooperative truncation
using the old threshold occurs and we lose the benefit of having delayed the
truncation in the first place. That attempt still helped a little bit (making
it somewhat less likely for the IOThreshold score to crack 1.0) but in this PR,
this approach was extended to pause truncation before the follower enters
pausable regime, and to do so regardless of the pausing status for the
follower. This operates on the assumption that if a follower is permanently
overloaded, it will never fall too far below the pausable threshold, and so we
would be delaying snapshots as long as this could possibly make sense.

This has a few potential downsides: First, retaining the raft log for
extended periods of time causes higher LSM write amp. I think this is
not a big problem - the old threshold, 8mb of log per replica, is
already large enough to have most log entries flushed to L0, and so the
bulk of the write amplification will be incurred either way. Second,
many large raft logs might run the cluster out of disk; it would be
better to have an explicit trade-off between delaying truncations
and that but at present there is no such mechanism. Third, there
might be a performance penalty on the healthy nodes due to lower
cache efficiencies both in pebble and the raft entry cache. And
fourth, catching up on many very long raft logs at the same time
could known weaknesses in the raft transport related to memory
usage with increased likelihood.

In the end, we're not doing anything about delaying log truncations
in this PR, but are saving this for a follow-up PR, tracking this work
in #84467.

Fixes #79215.

Release note (ops change): the admission.kv.pause_replication_io_threshold
cluster setting can be set to a nonzero value to reduce I/O throughput on
followers that are driven towards an inverted LSM by replication traffic. The
functionality is disabled by default. A suggested value is 0.8, meaning that
replication traffic to nonessential followers is paused before these followers
will begin throttling their foreground traffic.

Footnotes

  1. https://github.com/cockroachdb/cockroach/issues/79215#issuecomment-1167857356

  2. https://github.com/cockroachdb/cockroach/issues/83920#issuecomment-1176536456

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@tbg tbg force-pushed the io-overload-drop-follower-traffic branch from c12eeea to 4141587 Compare July 6, 2022 16:33
@tbg tbg force-pushed the io-overload-drop-follower-traffic branch 4 times, most recently from 8e7aa21 to b50dc61 Compare July 12, 2022 09:02
@tbg tbg force-pushed the io-overload-drop-follower-traffic branch from b50dc61 to 09d3c7a Compare July 12, 2022 10:37
@tbg tbg force-pushed the io-overload-drop-follower-traffic branch 3 times, most recently from 8907ce4 to 6a39a4c Compare July 12, 2022 12:24
@tbg tbg force-pushed the io-overload-drop-follower-traffic branch from 1ad7cdc to 677dbc5 Compare July 12, 2022 23:33
@tbg
Copy link
Member Author

tbg commented Jul 13, 2022

The jury is still out on the (at time of writing) last commit, but other than that this has been fairly polished and is ready for a serious look.

@tbg tbg marked this pull request as ready for review July 13, 2022 00:14
@tbg tbg requested a review from a team July 13, 2022 00:14
@tbg tbg requested review from a team as code owners July 13, 2022 00:14
@tbg tbg requested review from erikgrinaker and a team and removed request for a team July 13, 2022 00:14
@tbg
Copy link
Member Author

tbg commented Jul 13, 2022

Notes on review - unfortunately I am out starting Friday, until Aug 6th. My suggestion would be to prioritize getting this merged, meaning predominantly verifying that with the default of zero for the cluster setting, there are no regressions. That way, it will be safe to merge. Of course the functionality should also be checked for bugs that can be discovered via code review but I would prefer filing away anything more sizeable as issues and revisit after PTO to make sure this main PR can merge.

@sumeerbhola
Copy link
Collaborator

On the plus side, even with that, the foreground traffic is not impacted by the
ailing follower. The chart below shows the p50, p99, and p100 for the kv0 workload
whose leases are on the non-throttled nodes n1 and n2. The first annotation is when
n3 gets unthrottled, and the second annotation marks it crashing (out of disk).

I didn't understand this graph. What is on the Y axis? And what shows that the foreground traffic is not impacted? What change should I be noticing after the first and second annotations?

@tbg
Copy link
Member Author

tbg commented Jul 13, 2022

@sumeerbhola you mean this graph, right?

image

It's the p50/p99/p100 latency of kv0, with leases pinned to n1 and n2 (i.e. only replicating passively to the overloaded node n3). The graph shows that the tail latencies are basically independent of n3 being overloaded (since they don't get better at the end, when n3 has crashed).

@craig
Copy link
Contributor

craig bot commented Jul 15, 2022

Build succeeded:

@joshimhoff
Copy link
Collaborator

@tbg & all, should we backport this but off by default obviously to 22.1, to be turned on in CC prod, ahead of the 22.2 release? Doing so might make us more confident about the 22.2 release, and it is easy to flip a cluster setting fleet-wide in CC.

@tbg
Copy link
Member Author

tbg commented Aug 17, 2022

Good idea in principle but there are tons of code changes here and a total of ~10 PRs - not something that backports.

irfansharif added a commit to irfansharif/cockroach that referenced this pull request Jan 23, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this first,
proto{col,buf}/interface-only PR, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
     TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to. This information used on the origin node to release
   flow tokens that were acquired when replicating the original log
   entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Jan 24, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this first,
proto{col,buf}/interface-only PR, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
     TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Jan 25, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Jan 25, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Jan 25, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Jan 27, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, time.Time, Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, Stream) (deducted bool)
     ReturnTokens(admissionpb.WorkPriority, Tokens, Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority, time.Time)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     DeductedTokensUpto(Stream) kvflowcontrolpb.RaftLogPosition
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     ReturnAllTokensUpto(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry description below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
craig bot pushed a commit that referenced this pull request Jan 27, 2023
95637: kvflowcontrol,raftlog: interfaces for replication control r=irfansharif a=irfansharif

Follower replication work, today, is not subject to admission control. It consumes IO tokens without waiting, which both (i) does not prevent the LSM from being inverted, and (ii) can cause priority inversion where low-pri follower write work ends up causing IO token exhaustion, which in turn causes throughput and latency impact for high-pri non-follower write work on that same store. This latter behavior was especially noticeble with large index backfills (#82556) where >2/3rds of write traffic on stores could be follower work for large AddSSTs, causing IO token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of #79215, settling on #83851 which pauses replication traffic to stores close to exceeding their IO overload threshold (data that's periodically gossiped). In large index backfill experiments we found this to help slightly, but it's still a coarse and imperfect solution -- we're deliberately causing under-replication instead of being able to shape the rate of incoming writes for low-pri work closer to the origin.

As part of #95563 we're introducing machinery for "replication admission control" -- end-to-end flow control for replication traffic. With it we expect to no longer need to bypass follower write work in admission control and solve the issues mentioned above. Some small degree of familiarity with the design is assumed below. In this first, proto{col,buf}/interface-only PR, we introduce:

1. Package `kvflowcontrol{,pb}`, which will provide flow control for replication traffic in KV. It will be part of the integration layer between KV and admission control. In it we have a few central interfaces:

   - `kvflowcontrol.Controller`, held at the node-level and holds all `flowcontrol.Tokens` for each `flowcontrol.Stream` (one per store we're sending raft traffic to and tenant we're sending it for). 
   - `kvflowcontrol.Handle`, which will held at the replica-level (only on those who are both leaseholder and raft leader), and will be used to interface with the node-level `kvflowcontrol.Controller`. When replicating log entries, these replicas choose the log position (term+index) the data is to end up at, and use this handle to track the token deductions on a per log position basis. Later when freeing up tokens (after being informed of said log entries being admitted on the receiving end of the stream), it's done so by specifying the log position up to which we free up all deducted tokens.
   
```go
type Controller interface {
  Admit(admissionpb.WorkPriority, ...Stream)
  DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
  ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
}

type Handle interface {
  Admit(admissionpb.WorkPriority)
  DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
  ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
  TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
  Close()
}
```
2. `kvflowcontrolpb.RaftAdmissionMeta` and relevant encoding/decoding routines. `RaftAdmissionMeta` is 'embedded' within a `kvserverpb.RaftCommand`, and includes necessary AC metadata on a per raft entry basis. Entries that contain this metadata will make use of the AC-specific raft log entry encodings described earlier. The AC metadata is decoded below-raft when looking to admit the write work. Also included is the node where this command originated, who wants to eventually learn of this command's admission.
```proto
message RaftAdmissionMeta {
  int32 admission_priority = ...;
  int64 admission_create_time = ...;
  int32 admission_origin_node = ...;
}
```
3. `kvflowcontrolpb.AdmittedRaftLogEntries`, which now features in `kvserverpb.RaftMessageRequest`, the unit of what's sent back-and-forth between two nodes over their two uni-directional raft transport streams. `AdmittedRaftLogEntries`, just like raft heartbeats, is coalesced information about all raft log entries that were admitted below raft. We'll use the origin node encoded in raft entry (`admission_origin_node` from 2. from above) to know where to send these to. This information used on the origin node to release flow tokens that were acquired when replicating the original log entries.
```proto
message AdmittedRaftLogEntries {
  int64 range_id = ...;
  int32 admission_priority = ...;
  RaftLogPosition up_to_raft_log_position = ...;
  uint64 store_id = ...;
}

message RaftLogPosition {
  uint64 term = ...;
  uint64 index = ...;
}
```
4. `kvflowcontrol.Dispatch`, which is used to dispatch information about admitted raft log entries (see 3. from above) to specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. The interface is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. The interface looks roughly like:
```go
type Dispatch interface {
  Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
  PendingDispatch() []roachpb.NodeID
  PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
}
```
5. Two new encodings for raft log entries, `EntryEncoding{Standard,Sideloaded}WithAC`. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, `EntryEncoding{Standard,Sideloaded}` (now renamed to `EntryEncoding{Standard,Sideloaded}WithoutAC`), to indicate whether the entry came with sideloaded data (these are typically AddSSTs, the storage for which is treated differently for performance). Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll admit the work without blocking.
    - We'll come back to this non-blocking admission in 7. below, even though the implementation is left for a future PR.
    - The decision to use replication admission control happens above raft, and AC-specific metadata is plumbed down as part of the marshaled raft command, as described in 2. above.

6. An unused version gate (`V23_1UseEncodingWithBelowRaftAdmissionData`) to use replication admission control. Since we're using a different prefix byte for raft commands (point 5. above), one not recognized in earlier CRDB versions, we need explicit versioning.

7. `AdmitRaftEntry`, on the `kvadmission.Controller` interface. We'll use this as the integration point for log entries received below raft, right as they're being written to storage. This will be non-blocking since we'll be below raft in the `raft.Ready()` loop, and will effectively enqueue a "virtual" work item in underlying `StoreWorkQueue` mediating store IO. This virtual work item is what later gets dequeued once the store granter informs the work queue of newly available IO tokens. For standard work queue ordering, our work item needs to include the create time and admission pri. The tenant ID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation); the store ID to find the right store work queue on multi-store nodes. The `raftpb.Entry` encodes within it its origin node (see 2. from above), which is used post-admission to inform the right node of said admission. It looks like:
```go
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
```

---

Here's how the various pieces fit together:

<img width="870" alt="image" src="https://user-images.githubusercontent.com/10536690/214457338-1521c94b-7d9a-4d50-8a29-80984d1a706a.png">



95715: bench: add separate-process tenant benchmarks r=yuzefovich a=yuzefovich

Some time ago we merged a change to run all benchmarks in `pkg/bench` against an in-memory tenant. Recently we introduced a local fastpath for in-process in-memory tenants which will resemble how things will look once we get to UA for single-tenant clusters. However, it is also interesting to measure how we perform with separate-process tenants (which is how we run serverless), so this commit introduces that additional config.

Epic: CRDB-14837

Release note: None

96027: ci: get rid of failures in make-based CI r=healthy-pod a=rickystewart

1. We don't post to GitHub from `make`-based CI jobs any more, and
   `github-post` doesn't build cleanly from `make` any more. I delete
   all the code related to `github-post` from `run_json_test`.
2. Delete `teamcity-check-genfiles.sh` which doesn't serve a purpose now
   that `vendor` is gone.

Epic: none
Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

kvserver: throttle writes on followers
5 participants