Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: throttle writes on followers #79215

Closed
erikgrinaker opened this issue Apr 1, 2022 · 29 comments · Fixed by #83851
Closed

kvserver: throttle writes on followers #79215

erikgrinaker opened this issue Apr 1, 2022 · 29 comments · Fixed by #83851
Assignees
Labels
A-admission-control A-kv-distribution Relating to rebalancing and leasing. A-kv-replication Relating to Raft, consensus, and coordination. C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) O-sre For issues SRE opened or otherwise cares about tracking.

Comments

@erikgrinaker
Copy link
Contributor

erikgrinaker commented Apr 1, 2022

We currently throttle writes on the receiving store based on store health (e.g. via admission control or via specialized AddSSTable throttling). However, this only takes into account the local store health, and not the associated write cost on followers during replication, which isn't always throttled. We've seen this lead to hotspots where follower stores get overwhelmed, since the follower writes bypass admission control. A similar problem exists with snapshot application.

This has been touched on in several other issues as well:

Jira issue: CRDB-14642

Epic CRDB-15069

@erikgrinaker erikgrinaker added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) A-kv-replication Relating to Raft, consensus, and coordination. T-kv-replication labels Apr 1, 2022
@sumeerbhola
Copy link
Collaborator

sumeerbhola commented Apr 1, 2022

(tried to summarize some recent discussions. @nvanbenschoten @tbg -- please correct/add details).

Considering the following simple scenario, assuming the allocator has already been improved:

  • Store is a follower for ranges in the set R_f and leaseholder for the set R_l.
  • A background write job (index backfill etc.) starts for a subset of ranges in R_f.
    • Let's assume an N minute lag in the allocator responding and finishing shifting enough load away from this node. Even in an ideal world, N could be 5+ minutes (in order to reduce range movement).
  • Problem: need to ensure that user facing operations on R_l are completely unaffected in both throughput and latency.

Claims/Ideas (from discussions with @nvanbenschoten and @tbg)

  • Use admission control on the raft replication path at the follower. We would plumb the priority and tenant into the raft message.
  • Where exactly is admission control being invoked and do we queue or fail fast.
    • (by @tbg) The follower should reject the raft append if there is no capacity.
      • Need to avoid shared grpc stream for raft from the source node to destination store. We can't simply segment into K streams based on priority since a single replica could use different priorities. Stream per replica? What is the cost? ...
    • (by @nvanbenschoten) Append to the raft log without bothering with admission control. The raft log is unlikely to be the source of high read amp. Invoke admission control before applying to state machine. etcd/raft changes? Conf changes that need application to state machine before being effective? ...
    • (simple) Do something similar to what we do now in calling PreIngestDelay in addSSTablePreApply. This could cause raft heartbeats to fail. Also, all raft scheduler threads can be consumed by ranges in R_f, leaving no threads to apply for the user facing traffic.

@erikgrinaker
Copy link
Contributor Author

Append to the raft log without bothering with admission control. The raft log is unlikely to be the source of high read amp. Invoke admission control before applying to state machine. etcd/raft changes? Conf changes that need application to state machine before being effective?

At a first glance, I think I'm partial to this solution. Some questions/concerns:

  • The Raft log has the benefit of being append-only. However, I assume Pebble currently won't handle this optimally because there are many Raft logs on the same Pebble store, and so the overall writes will be spread across a wide keyspan, meaning it will have to go through compaction as usual with the associated write amplification. With the ongoing Raft storage work, can we make log appends as cheap as possible, i.e. a single append write? Can/should we do this with Pebble, or do we need a different storage engine?

  • We need to ensure that log application gets priority when catching up a new leaseholder (i.e. when applying entries up to the current commit index).

  • Should the quota pool use the applied index rather than the commit index, to prevent followers from falling too far behind? Or, related to the above point, should a replica stop accepting new log entries if application falls too far behind?

@joshimhoff joshimhoff added the O-sre For issues SRE opened or otherwise cares about tracking. label Apr 7, 2022
@sumeerbhola
Copy link
Collaborator

because there are many Raft logs on the same Pebble store, and so the overall writes will be spread across a wide keyspan, meaning it will have to go through compaction as usual with the associated write amplification.

The raft logs are in the range-id local key space, so all of them will be prefixed by LocalRangeIDPrefix, which will keep them away from all the state machine state. So Pebble compactions should do the right thing, and if not (say we do see L0 flushed sstables spanning from the range-id local key space to the global key space), we can investigate and fix this in Pebble by tweaking the flush split logic.

Should the quota pool use the applied index rather than the commit index, to prevent followers from falling too far behind? Or, related to the above point, should a replica stop accepting new log entries if application falls too far behind?

Let's ignore transient overload for now, since admission control on the store write path doesn't throttle until overload is persistent (and has accumulated significant work in L0). Stated another way, say store S3 with follower replica for range R1 can only handle application at 60% of the throughput of the other replicas (due to admission control). We should assume then that whatever budget we have for how far application at the follower is allowed to fall behind will be exhausted, and the mechanism we need is that which will allow the raft group to efficiently work at this 60% throughput (until the rebalancer shifts the follower elsewhere). So flow control all the way back to the leaseholder seems to be what we need, and not necessarily separation of append from application. This flow control would need to allow raft heartbeats and configuration changes to not get stuck behind other normal raft commands. And we would configure admission control at S3 such that raft configuration changes would bypass the admission queue.

@erikgrinaker
Copy link
Contributor Author

erikgrinaker commented Apr 8, 2022

Pebble compactions should do the right thing, and if not (say we do see L0 flushed sstables spanning from the range-id local key space to the global key space), we can investigate and fix this in Pebble by tweaking the flush split logic.

What's the right thing here? Since all of the levels will be non-overlapping, will it avoid doing any compactions at all until the log gets truncated?

We should assume then that whatever budget we have for how far application at the follower is allowed to fall behind will be exhausted, and the mechanism we need is that which will allow the raft group to efficiently work at this 60% throughput (until the rebalancer shifts the follower elsewhere). So flow control all the way back to the leaseholder seems to be what we need, and not necessarily separation of append from application.

I'm not sure that's always desirable though. A 40% loss in throughput would not be acceptable in a lot of workloads, and we often get complaints of a "sick node" affecting a the entire cluster in this way.

Let's assume that we have a 20-node cluster with 10000 ranges and RF=3. One of the nodes starts experiencing slowness (read amp or otherwise), with 60% throughput. This would affect 1500 ranges (15%) evenly distributed across all nodes, essentially impacting the entire workload if we assume some degree of cross-range txns. In this case, it would likely be better to give up on the follower and upreplicate elsewhere than to have 40% throughput reduction across this many ranges for however long it takes the allocator to move 1500 replicas. We often see this play out in escalations, where operators end up killing the node to force the upreplication.

I think I'm leaning towards appending the log entries and locally throttling application for however long it takes, since this avoids log entry retransmits and leader log retention. But it's still a bit unclear how aggressive the quota pool/allocator should be in cutting off followers (@tbg has suggested considering the relative rates of the leader and followers). If we do lose the leaseholder, then we should prioritize application of these entries to catch up, but hopefully the allocator will step in before it gets that far.

@tbg has been looking into some similar issues with the current below-Raft throttling and Raft queues/buffers, and I think that work points in this direction too -- but he'll write up some of his results later.

@tbg
Copy link
Member

tbg commented Apr 8, 2022

One short-term goal we should pursue is to avoid any below-raft throttling, i.e. this

func addSSTablePreApply(
ctx context.Context,
st *cluster.Settings,
eng storage.Engine,
sideloaded SideloadStorage,
term, index uint64,
sst kvserverpb.ReplicatedEvalResult_AddSSTable,
limiter *rate.Limiter,
) bool {
checksum := util.CRC32(sst.Data)
if checksum != sst.CRC32 {
log.Fatalf(
ctx,
"checksum for AddSSTable at index term %d, index %d does not match; at proposal time %x (%d), now %x (%d)",
term, index, sst.CRC32, sst.CRC32, checksum, checksum,
)
}
path, err := sideloaded.Filename(ctx, index, term)
if err != nil {
log.Fatalf(ctx, "sideloaded SSTable at term %d, index %d is missing", term, index)
}
tBegin := timeutil.Now()
var tEndDelayed time.Time
defer func() {
if dur := timeutil.Since(tBegin); dur > addSSTPreApplyWarn.threshold && addSSTPreApplyWarn.ShouldLog() {
log.Infof(ctx,
"ingesting SST of size %s at index %d took %.2fs (%.2fs on which in PreIngestDelay)",
humanizeutil.IBytes(int64(len(sst.Data))), index, dur.Seconds(), tEndDelayed.Sub(tBegin).Seconds(),
)
}
}()
eng.PreIngestDelay(ctx)
tEndDelayed = timeutil.Now()
copied := false
if eng.InMem() {
// Ingest a copy of the SST. Otherwise, Pebble will claim and mutate the
// sst.Data byte slice, which will also be used later by e.g. rangefeeds.
data := make([]byte, len(sst.Data))
copy(data, sst.Data)
path = fmt.Sprintf("%x", checksum)
if err := eng.WriteFile(path, data); err != nil {
log.Fatalf(ctx, "unable to write sideloaded SSTable at term %d, index %d: %s", term, index, err)
}
} else {
ingestPath := path + ".ingested"
// The SST may already be on disk, thanks to the sideloading
// mechanism. If so we can try to add that file directly, via a new
// hardlink if the filesystem supports it, rather than writing a new
// copy of it. We cannot pass it the path in the sideload store as
// the engine deletes the passed path on success.
if linkErr := eng.Link(path, ingestPath); linkErr == nil {
ingestErr := eng.IngestExternalFiles(ctx, []string{ingestPath})
if ingestErr != nil {
log.Fatalf(ctx, "while ingesting %s: %v", ingestPath, ingestErr)
}
// Adding without modification succeeded, no copy necessary.
log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, ingestPath)
return false
}
path = ingestPath
log.Eventf(ctx, "copying SSTable for ingestion at index %d, term %d: %s", index, term, path)
// TODO(tschottdorf): remove this once sideloaded storage guarantees its
// existence.
if err := eng.MkdirAll(filepath.Dir(path)); err != nil {
panic(err)
}
if _, err := eng.Stat(path); err == nil {
// The file we want to ingest exists. This can happen since the
// ingestion may apply twice (we ingest before we mark the Raft
// command as committed). Just unlink the file (the storage engine
// created a hard link); after that we're free to write it again.
if err := eng.Remove(path); err != nil {
log.Fatalf(ctx, "while removing existing file during ingestion of %s: %+v", path, err)
}
}
if err := writeFileSyncing(ctx, path, sst.Data, eng, 0600, st, limiter); err != nil {
log.Fatalf(ctx, "while ingesting %s: %+v", path, err)
}
copied = true
}
if err := eng.IngestExternalFiles(ctx, []string{path}); err != nil {
log.Fatalf(ctx, "while ingesting %s: %+v", path, err)
}
log.Eventf(ctx, "ingested SSTable at index %d, term %d: %s", index, term, path)
return copied
}

In my experiments, I see this contribute significantly to memory pressure on the node and generally ... obfuscated what's going on. Holding raftMu for extended periods if time (I've seen 477s, which could be caused by the rate limiter shared across arbitrary numbers of ranges) leads to lots of SSTs queuing up at the raft scheduler on top of many other issues that stem from holding mutexes for extended periods of time (try to get a range status or report metrics on this replica in the meantime, for example). In a sense, this simulates what a "truly bad" system would look like and there should ultimately be backpressure all the way up to the leader, but I think it would be a step-change if we replaced this below-raft throttling by avoiding attempts to apply these entries in the first place. Instead of rate limiting / delaying, we'd determine the max size of entries to apply on a per-Ready basis, based on how much we're willing to ingest/add to the storage engine (this requires some raft changes). Then, whatever we pull to apply, we apply as fast as possible.

When to "cut off" followers is, of course, related, but I think we should be careful to discuss them somewhat separately to keep the concerns clear. I don't have an easy answer here but it feels right to me to make that call based on a velocity difference. If a follower is only 0.5% slower than the others, over time it will fall behind by arbitrary amounts, so an absolute "behind size" isn't a good cutoff. But if a follower is 30% slower than the others, we may want to cut it off once it's fallen enough to start throttling in the quota pool, and we may want to do so not only because it wrecks the range's performance, but also because it may not be able to handle the load we're sending it. To shed load, then, all the follower would have to do is to stop consuming log entries, and we can discuss how sophisticated this mechanism has to be. I would expect that it wouldn't have to be too sophisticated.

This all may not be enough if we're trying to handle the case (as we ought to, ultimately) in which a quorum of followers are getting very slow (and perhaps we're waiting for vertical scaling to occur, etc). Before rendering the range completely unavailable, the leader would need to pick a quorum and make sure to keep it up to date, meaning that these followers would see a steady stream of incoming that they may not be able to handle at the rate. So here explicit backpressure would be better than, say, the follower just dropping appends. But again, the appends are not generally the problem, but the log application, and the log application is under control of the follower.

Would be helpful if we rallied around a number of examples which we want to address, which we can prioritize and see how various solutions address them.

Here's an attempt:

  • follower(s) are slightly slower than leaseholder, but not by much: want everything to work like today. Quota pool eventually empties out, and so write throughput tracks that of the closest follower.
  • a minority of followers is "much slower" than the leaseholder (perhaps as a result of overload or disk problems). We want to sacrifice these followers, i.e. we stop keeping them up to date and make "fast" progress, albeit with reduced availability and alarm bells going off for the operator, until the slowness subsides (how to determine that?) or the replicas get replaced.
  • a majority of nodes is "much slower". We need to treat this like the "slightly slower" case, picking a quorum that we stick with (& where we go at the slowest speed, via quota pool) and abandon whatever followers weren't picked.

@sumeerbhola
Copy link
Collaborator

What's the right thing here? Since all of the levels will be non-overlapping, will it avoid doing any compactions at all until the log gets truncated?

I am not sure what is meant by "all of the levels will be non-overlapping". I was merely remarking that we will hopefully not see many flushed sstables spanning from the range-id local key space to the global key space to the state machine. A consequence of that is that if appends to the raft log get deleted "soon", we will see less write amplification for that part of the key space. It doesn't mean there will be no compactions, but it may be that after reaching Lbase, the log entries get deleted. But if we are just appending to the raft log and not applying, the replica can't truncate the raft log, so the write amplification of the raft log could get as bad as the state machine. In which case the compactions that are falling behind in dealing with the write amp of the state machine, thereby increase read amp, will further be stressed by the raft log.

I'm not sure that's always desirable though. A 40% loss in throughput would not be acceptable in a lot of workloads, and we often get complaints of a "sick node" affecting a the entire cluster in this way.
Let's assume that we have a 20-node cluster with 10000 ranges and RF=3. One of the nodes starts experiencing slowness (read amp or otherwise), with 60% throughput. This would affect 1500 ranges (15%) evenly distributed across all nodes

btw, my 60% comment and flow control was in response to the idea "should a replica stop accepting new log entries if application falls too far behind?". What I am claiming is:

  • If there is some maximum lag in application after which appends will stop, then it should either be long enough to allow the allocator to take action (5min?), or we should assume that the lag will be reached and we need to degrade gracefully after that.
  • 60% capacity aggregated across all the replicas in a store may be ok: If we are plumbing tenant info and priority through raft replication, both high priority and inter-tenant fairness should be respected on the store that is falling behind. For example, if we are suddenly seeing a 2x load spike because of an index backfill that is hitting 100 of the follower replicas on the store, if we arrange the code correctly we should see only those ranges getting throttled via flow control all the way back to their leaseholders.

It would be nice to see a very clear articulation of requirements/desired high-level behavior (elaborating on the ones @tbg stated), design a clean solution with minimal mechanisms, and then work backwards to the intermediate steps.

@erikgrinaker
Copy link
Contributor Author

What's the right thing here? Since all of the levels will be non-overlapping, will it avoid doing any compactions at all until the log gets truncated?

I am not sure what is meant by "all of the levels will be non-overlapping". I was merely remarking that we will hopefully not see many flushed sstables spanning from the range-id local key space to the global key space to the state machine. A consequence of that is that if appends to the raft log get deleted "soon", we will see less write amplification for that part of the key space. It doesn't mean there will be no compactions, but it may be that after reaching Lbase, the log entries get deleted. But if we are just appending to the raft log and not applying, the replica can't truncate the raft log, so the write amplification of the raft log could get as bad as the state machine. In which case the compactions that are falling behind in dealing with the write amp of the state machine, thereby increase read amp, will further be stressed by the raft log.

Yeah, this worries me too. In #78412, I saw severe performance degradation in cases where the Raft log lingered, and this would only exacerbate the problem if we halt application but keep appending to the log when we see store overload.

Since the Raft log is sequential in space and time, we could end up with e.g. 1-10 in L2, 11-20 in L1, and 21-30 in L0. There is marginal benefit in compacting this, since it will only be read once and eventually removed. Can/should Pebble be smart about this?

The WAL+LSM overhead seems unfortunate and unnecessary here. In the optimal case, we would have a single append write for each log entry, and a cheap way to remove the tail of the log. Have we considered using a different storage backend for the Raft log?

I'm not sure that's always desirable though. A 40% loss in throughput would not be acceptable in a lot of workloads, and we often get complaints of a "sick node" affecting a the entire cluster in this way.
Let's assume that we have a 20-node cluster with 10000 ranges and RF=3. One of the nodes starts experiencing slowness (read amp or otherwise), with 60% throughput. This would affect 1500 ranges (15%) evenly distributed across all nodes

btw, my 60% comment and flow control was in response to the idea "should a replica stop accepting new log entries if application falls too far behind?". What I am claiming is:

  • If there is some maximum lag in application after which appends will stop, then it should either be long enough to allow the allocator to take action (5min?), or we should assume that the lag will be reached and we need to degrade gracefully after that.
  • 60% capacity aggregated across all the replicas in a store may be ok: If we are plumbing tenant info and priority through raft replication, both high priority and inter-tenant fairness should be respected on the store that is falling behind. For example, if we are suddenly seeing a 2x load spike because of an index backfill that is hitting 100 of the follower replicas on the store, if we arrange the code correctly we should see only those ranges getting throttled via flow control all the way back to their leaseholders.

Yeah, there's a difference here between slowdowns due to ingest volume (which should ideally only affect the target ranges) and slowdown due to node problems (which affects all ranges on that node).

Another difficulty here is that bulk ingestion is typically done with a lower priority, but I don't think we can respect these priorities at the Raft level because we can't reorder the writes. We can likely use this to prioritize different replicas for application, but we would have head-of-line blocking on the range.

It would be nice to see a very clear articulation of requirements/desired high-level behavior (elaborating on the ones @tbg stated), design a clean solution with minimal mechanisms, and then work backwards to the intermediate steps.

I think we should draft a design doc around this, for short- and long-term solutions, as there are several issues we need to consider. @tbg would you be willing to get one started, since you're already pretty deep in these issues? I need to stay focused on MVCC range tombstones, and will be OOO for a week as well.

@tbg
Copy link
Member

tbg commented Apr 11, 2022

I think we should draft a design doc around this, for short- and long-term solutions, as there are several issues we need to consider. @tbg would you be willing to get one started, since you're already pretty deep in these issues? I need to stay focused on MVCC range tombstones, and will be OOO for a week as well.

👍🏽

@jbowens
Copy link
Collaborator

jbowens commented Apr 11, 2022

Since the Raft log is sequential in space and time, we could end up with e.g. 1-10 in L2, 11-20 in L1, and 21-30 in L0. There is marginal benefit in compacting this, since it will only be read once and eventually removed. Can/should Pebble be smart about this?

The log is keyed by range, so it consists of many short sequential runs, right?

I was merely remarking that we will hopefully not see many flushed sstables spanning from the range-id local key space to the global key space to the state machine.

The min-overlapping ratio heuristic used in L1 and lower prioritizes compacting files that are large with respect to the data beneath them. This seems undesirable for ephemeral data like the raft log. The incoming data is 'dense,' as opposed to the sparser distribution you'd get with, for eg, random writes across the keyspace. If raft log entries survive into L1 and accumulate, it seems like they'd be prime candidates for compaction :/

@erikgrinaker
Copy link
Contributor Author

Since the Raft log is sequential in space and time, we could end up with e.g. 1-10 in L2, 11-20 in L1, and 21-30 in L0. There is marginal benefit in compacting this, since it will only be read once and eventually removed. Can/should Pebble be smart about this?

The log is keyed by range, so it consists of many short sequential runs, right?

Correct. If we were to use a new/different storage engine for this, we could possibly store them as separate Raft logs, since cheap appends and truncates would presumably be easier to handle that way. That would require frequent seeks across different logs, but maybe that doesn't matter much with SSDs.

@nvanbenschoten
Copy link
Member

nvanbenschoten commented Apr 11, 2022

Conf changes that need application to state machine before being effective?

Apply-time config changes are problematic for solutions that try to retain voting capabilities on followers while throttling log application. The idea here would be to decouple these concerns so that range availability (i.e. the ability for the range to reach quorum) has no dependency on admission control on followers. In other words, a follower could continue to append entries to its log and vote in elections even if its applied state lagged arbitrarily behind.

The concern is that the invariants outlined in @tbg's comment in etcd-io/etcd#7625 (comment) would lead to subtle dependencies from range availability to entry application on followers. We wouldn't want for a follower's applied state to lag far behind its log without affecting its range's availability in the common case, but then in an edge case for a config change (for example) to suddenly block that range's availability for an indefinite amount of time as application catches up. These three invariants don't directly introduce such a dependency for followers, but I think the Campaign invariant is the sticking point. If all followers in the majority quorum were allowed to fall arbitrarily far behind on application, the failure of the range's leader could lead to a period of time where no eligible replicas (those with the full committed log) are permitted to campaign.

So if range availability does need to have a dependency on each follower's admission control state, I think we'd want that interaction to be reflected in the common case and not just in obscure edge cases. That way, we can immediately start pushing back on proposals instead of building up a large amount of application debt that could stall a range entirely for an indefinite amount of time under just the right circumstances.

@erikgrinaker
Copy link
Contributor Author

If all followers in the majority quorum were allowed to fall arbitrarily far behind on application, the failure of the range's leader could lead to a period of time where no eligible replicas (those with the full committed log) are permitted to campaign.

Doesn't this just mean that the leader must start throttling proposals when a quorum of voters are behind on conf change application, as opposed to when a single voter is behind?

Of course, this means that if 2/5 voters are behind on application, and we lose one of the up-to-date replicas, then the leader will immediately begin throttling -- possibly for a long time.

I think we'd want that interaction to be reflected in the common case and not just in obscure edge cases. That way, we can immediately start pushing back on proposals instead of building up a large amount of application debt that could stall a range entirely for an indefinite amount of time under just the right circumstances.

I'm not sure I fully agree. There's definitely a risk that we build up too much application debt and fall of a cliff if we suddenly have to repay that. But it also seems bad to always pay a penalty when a follower is behind to avoid the rare case where we'd have to suddenly catch up that follower.

I think this really comes down to how fast the allocator (or some other system) is able to resolve the problem. I think I would be fine with allowing a minority of followers to lag for long enough that we can reasonably expect the problem to be resolved (typically by upreplicating elsewhere), but not allow them to fall behind indefinitely. So we would throttle proposals as a function of the median and maximum replica lag.

This also implies that a lagging replica with lag <= the median lag should get application priority (which also implies that a new leader/leaseholder gets priority). However, I'm not sure if we can reliably determine the global median lag for the range, especially not if the local replica is behind of conf changes and has an outdated view of the range configuration.

@sumeerbhola
Copy link
Collaborator

This also implies that a lagging replica with lag <= the median lag should get application priority

I understand the thinking behind this, but I would prefer if we first tried to orient our thinking on the overall behavior of the system and not whether a range is getting worse performance, and think fully through where that leads us. In that way of thinking, what should get application priority ought to be dictated by inter-tenant fairness and secondarily by priority of work. It is good that a range does not span tenants. Priority of the work of course can vary within a range, and we are subject to head-of-line blocking when applying for a range, but I was positing earlier that this may also be acceptable for many of the common scenarios when a range is only receiving low priority bulk work.
Adding additional criteria like who is lagging by what amount creates tension between the various goals, which would be really nice to avoid.

tbg added a commit to tbg/cockroach that referenced this issue Apr 28, 2022
This commit adds a few new metrics that should be helpful in diagnosing
OOMs such as seen in cockroachdb#80155, and, down the road, for experimenting with
(and ultimately implementing) solutions to cockroachdb#79215.

`cr.store.raft.rcvd.queued_bytes`: gauge (sum of size of all entries waiting to be handed to raft)
`cr.store.raft.rcvd.stepped_bytes`: counter (sum of size of all entries handed to RawNode.Step)
`cr.store.raft.rcvd.dropped_bytes`: counter (sum of size of all entries that were dropped because recv queue filled up)

Release note: None
tbg added a commit to tbg/cockroach that referenced this issue May 6, 2022
This commit adds a few new metrics that should be helpful in diagnosing
OOMs such as seen in cockroachdb#80155, and, down the road, for experimenting with
(and ultimately implementing) solutions to cockroachdb#79215.

`cr.store.raft.rcvd.queued_bytes`: gauge (sum of size of all entries waiting to be handed to raft)
`cr.store.raft.rcvd.stepped_bytes`: counter (sum of size of all entries handed to RawNode.Step)
`cr.store.raft.rcvd.dropped_bytes`: counter (sum of size of all entries that were dropped because recv queue filled up)

Release note: None
tbg added a commit to tbg/cockroach that referenced this issue May 11, 2022
This commit adds a few new metrics that should be helpful in diagnosing
OOMs such as seen in cockroachdb#80155, and, down the road, for experimenting with
(and ultimately implementing) solutions to cockroachdb#79215.

`cr.store.raft.rcvd.queued_bytes`: gauge (sum of size of all entries waiting to be handed to raft)
`cr.store.raft.rcvd.stepped_bytes`: counter (sum of size of all entries handed to RawNode.Step)
`cr.store.raft.rcvd.dropped_bytes`: counter (sum of size of all entries that were dropped because recv queue filled up)

Release note: None
craig bot pushed a commit that referenced this issue Jul 15, 2022
83851: kvserver: avoid replicating to followers that are I/O overloaded r=erikgrinaker a=tbg

This commit implements the stop-gap solution to raft traffic contributing to
I/O overload that was discussed[^1] in #79215.

The newly introduced `admission.kv.pause_replication_io_threshold` cluster
setting can be used to define an I/O overload score above which raft leaders
will attempt to pause replication to nonessential followers, to given them a
chance at tidying up their LSM.

A follower is "nonessential" if it is either a non-voter, or if we think (and
are fairly certain that) quorum can be reached without it. If there are
multiple overloaded followers, we pick as many as we can without losing quorum,
and we try to do so in a way that's a) random, i.e.  different raft leaders
will pick a different set, to give each store a share of the relief, and b)
stable, i.e. each raft leader will pick the same set at least for a little
while, to avoid interrupting a quorum by rapidly switching the set of active
followers (but see here[^2]).

The implementation of this is as follows:
- on Store, materialize (on each tick) a `storeOverloadMap` from gossip
- each Replica, on tick, from this map compute the set of followers to pause,
  and store it in a per-Replica map.
- consult this latter map
    - when sending `MsgApp` from `handleRaftReady`
    - when releasing proposal quota.

This commit by default disables this new functionality by setting the cluster
setting to zero. This has the effect of an empty `storeOverloadMap` and thus,
after one round of gossip and subsequent tick (i.e. within seconds), an empty
per-Replica paused followers map.

Additionally, it's worth pointing out the mixed-version behavior: the old
nodes' stores will be observed as gossiping a zero IOThreshold, which is
considered not overloaded, i.e. replication streams to old nodes will never be
paused.

Owing to [experiments] (with an IOThreshold cutoff score of 0.8) , more commits
were added to improve on the impact of snapshots. With just the basic commit
described above, the overloaded store will - at least in the experiment -
manage to control the shape of the LSM, though barely. This is because once
followers are paused, some of the ranges will undergo non-cooperative log
truncations, meaning that the paused follower will be cut off from the log.
This then initiates a steady stream of (raft) snapshots to the follower, and
much of these snapshots ingests into L0, driving up the L0 **file** count (and
thus penalizing foreground traffic to n3, which is not causing any of this), as
seen here (red line):

<img width="816" alt="image"
src="https://user-images.githubusercontent.com/5076964/178611060-dfbe88ce-567e-46fd-92d4-1b037e984efb.png">

<img width="817" alt="image"
src="https://user-images.githubusercontent.com/5076964/178611028-b36859b1-758b-4ea9-aeb3-d6d0081278f0.png">

<img width="820" alt="image"
src="https://user-images.githubusercontent.com/5076964/178611147-142e34ce-15c2-4467-8b88-5449585391b7.png">

On the plus side, even with that, the foreground traffic is not impacted by the
ailing follower. The chart below shows the p50, p99, and p100 for the kv0 workload
whose leases are on the non-throttled nodes n1 and n2. The first annotation is when
n3 gets unthrottled, and the second annotation marks it crashing (out of disk).

<img width="1223" alt="image" src="https://user-images.githubusercontent.com/5076964/178736513-e2ab12e1-a4c9-4ccd-8152-e853f6c051fd.png">

The first additional commit attempts to alleviate this issue by blocking (raft)
snapshots that are about to be sent to a paused follower. With it, the L0 file
count is under noticeably tighter control:

<img width="815" alt="image"
src="https://user-images.githubusercontent.com/5076964/178611379-8de5b6bb-9c33-4ade-a925-55f0b849d175.png">

and this is due to the snapshots flowing only when followers are not paused:

<img width="817" alt="image"
src="https://user-images.githubusercontent.com/5076964/178611479-999813b2-7dae-4cf6-a9c0-f637967743fb.png">

<img width="820" alt="image"
src="https://user-images.githubusercontent.com/5076964/178611537-ebbee74d-7cb0-4439-869c-7e58a10691de.png">

A now-removed additional commit raised the bar for non-cooperative log
truncation to the configured max range size while a paused follower is present.
In theory, this should significantly cut down on the number of replicas needing
snapshots, but in practice the experiments show that whenever the follower
de-loads for a brief moment (as it frequently does in this experiment since the
overload is entirely driven by raft traffic), a non-cooperative truncation
using the old threshold occurs and we lose the benefit of having delayed the
truncation in the first place. That attempt still helped a little bit (making
it somewhat less likely for the IOThreshold score to crack 1.0) but in this PR,
this approach was extended to pause truncation *before* the follower enters
pausable regime, and to do so regardless of the pausing status for the
follower. This operates on the assumption that if a follower is permanently
overloaded, it will never fall too far below the pausable threshold, and so we
would be delaying snapshots as long as this could possibly make sense.

This has a few potential downsides: First, retaining the raft log for
extended periods of time causes higher LSM write amp. I think this is
not a big problem - the old threshold, 8mb of log per replica, is
already large enough to have most log entries flushed to L0, and so the
bulk of the write amplification will be incurred either way. Second,
many large raft logs might run the cluster out of disk; it would be
better to have an explicit trade-off between delaying truncations
and that but at present there is no such mechanism. Third, there
might be a performance penalty on the healthy nodes due to lower
cache efficiencies both in pebble and the raft entry cache. And
fourth, catching up on many very long raft logs at the same time
could known weaknesses in the raft transport related to memory
usage with increased likelihood.

Fixes #79215.

[experiments]: #81516


[^1]: #79215 (comment)
[^2]: #83920 (comment)

Release note (ops change): the `admission.kv.pause_replication_io_threshold`
cluster setting can be set to a nonzero value to reduce I/O throughput on
followers that are driven towards an inverted LSM by replication traffic. The
functionality is disabled by default. A suggested value is 0.8, meaning that
replication traffic to nonessential followers is paused before these followers
will begin throttling their foreground traffic.


Co-authored-by: Tobias Grieger <[email protected]>
@craig craig bot closed this as completed in 43a37d5 Jul 15, 2022
tbg added a commit to tbg/cockroach that referenced this issue Aug 11, 2022
This is is a less ad-hoc version of the experiment in cockroachdb#81289, where I
messed with the EBS configuration. This can't be done programmatically,
and so here we use an IO nemesis on n3 instead.

Part of cockroachdb#79215.
Closes cockroachdb#81834.

Release note: None
tbg added a commit to tbg/cockroach that referenced this issue Aug 18, 2022
This is is a less ad-hoc version of the experiment in cockroachdb#81289, where I
messed with the EBS configuration. This can't be done programmatically,
and so here we use an IO nemesis on n3 instead.

Part of cockroachdb#79215.
Closes cockroachdb#81834.

Release note: None
@irfansharif
Copy link
Contributor

X-linking #95563, which is orthogonal to follower pausing and a form of follower write control.

irfansharif added a commit to irfansharif/cockroach that referenced this issue Jan 23, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this first,
proto{col,buf}/interface-only PR, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
     TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to. This information used on the origin node to release
   flow tokens that were acquired when replicating the original log
   entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
irfansharif added a commit to irfansharif/cockroach that referenced this issue Jan 24, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this first,
proto{col,buf}/interface-only PR, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
     TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this issue Jan 25, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this issue Jan 25, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this issue Jan 25, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, ...Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
     ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     TrackLowWater(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry section below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData)
   to use replication admission control. Since we're using a different
   prefix byte for raft commands (see EntryEncodings above), one not
   recognized in earlier CRDB versions, we need explicit versioning.

7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this issue Jan 27, 2023
Follower replication work, today, is not subject to admission control.
It consumes IO tokens without waiting, which both (i) does not prevent
the LSM from being inverted, and (ii) can cause priority inversion where
low-pri follower write work ends up causing IO token exhaustion, which
in turn causes throughput and latency impact for high-pri non-follower
write work on that same store. This latter behavior was especially
noticeble with large index backfills (cockroachdb#82556) where >2/3rds of write
traffic on stores could be follower work for large AddSSTs, causing IO
token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of cockroachdb#79215, settling on cockroachdb#83851
which pauses replication traffic to stores close to exceeding their IO
overload threshold (data that's periodically gossiped). In large index
backfill experiments we found this to help slightly, but it's still a
coarse and imperfect solution -- we're deliberately causing
under-replication instead of being able to shape the rate of incoming
writes for low-pri work closer to the origin.

As part of cockroachdb#95563 we're introducing machinery for "replication admission
control" -- end-to-end flow control for replication traffic. With it we
expect to no longer need to bypass follower write work in admission
control and solve the issues mentioned above. Some small degree of
familiarity with the design is assumed below. In this
proto{col,buf}/interface-only commit and the previous raft log encoding
commit, we introduce:

1. Package kvflowcontrol{,pb}, which will provide flow control for
   replication traffic in KV. It will be part of the integration layer
   between KV and admission control. In it we have a few central
   interfaces:

   - kvflowcontrol.Controller, held at the node-level and holds all
     kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store
     we're sending raft traffic to and tenant we're sending it for).
   - kvflowcontrol.Handle, which will held at the replica-level (only
     on those who are both leaseholder and raft leader), and will be
     used to interface with the node-level kvflowcontrol.Controller.
     When replicating log entries, these replicas choose the log
     position (term+index) the data is to end up at, and use this handle
     to track the token deductions on a per log position basis. Later
     when freeing up tokens (after being informed of said log entries
     being admitted on the receiving end of the stream), it's done so by
     specifying the log position up to which we free up all deducted
     tokens.

   type Controller interface {
     Admit(admissionpb.WorkPriority, time.Time, Stream)
     DeductTokens(admissionpb.WorkPriority, Tokens, Stream) (deducted bool)
     ReturnTokens(admissionpb.WorkPriority, Tokens, Stream)
   }

   type Handle interface {
     Admit(admissionpb.WorkPriority, time.Time)
     DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
     DeductedTokensUpto(Stream) kvflowcontrolpb.RaftLogPosition
     ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Stream)
     ReturnAllTokensUpto(kvflowcontrolpb.RaftLogPosition, Stream)
     Close()
   }

2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding
   routines. RaftAdmissionMeta is 'embedded' within a
   kvserverpb.RaftCommand, and includes necessary AC metadata on a per
   raft entry basis. Entries that contain this metadata will make use of
   the AC-specific raft log entry encodings described earlier. The AC
   metadata is decoded below-raft when looking to admit the write work.
   Also included is the node where this command originated, who wants to
   eventually learn of this command's admission.

   message RaftAdmissionMeta {
     int32 admission_priority = ...;
     int64 admission_create_time = ...;
     int32 admission_origin_node = ...;
   }

3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in
   kvserverpb.RaftMessageRequest, the unit of what's sent
   back-and-forth between two nodes over their two uni-directional raft
   transport streams. AdmittedRaftLogEntries, just like raft
   heartbeats, is coalesced information about all raft log entries that
   were admitted below raft. We'll use the origin node encoded in raft
   entry (admission_origin_node from from above) to know where to
   send these to, which in turn will release flow tokens that were
   acquired when replicating the original log entries.

   message AdmittedRaftLogEntries {
     int64 range_id = ...;
     int32 admission_priority = ...;
     RaftLogPosition up_to_raft_log_position = ...;
     uint64 store_id = ...;
   }

   message RaftLogPosition {
     uint64 term = ...;
     uint64 index = ...;
   }

4. kvflowcontrol.Dispatch, which is used to dispatch information about
   admitted raft log entries (see AdmittedRaftLogEntries from above) to
   specific nodes where (i) said entries originated, (ii) flow tokens
   were deducted and (iii) are waiting to be returned. The interface is
   also used to read pending dispatches, which will be used in the raft
   transport layer when looking to piggyback information on traffic
   already bound to specific nodes. Since timely dispatching (read:
   piggybacking) is not guaranteed, we allow querying for all
   long-overdue dispatches. The interface looks roughly like:

   type Dispatch interface {
     Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
     PendingDispatch() []roachpb.NodeID
     PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
   }

5. Two new encodings for raft log entries,
   EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have
   prefix byte that informs decoding routines how to interpret the
   subsequent bytes. To date we've had two,
   EntryEncoding{Standard,Sideloaded} (now renamed to
   EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether
   the entry came with sideloaded data (these are typically AddSSTs, the
   storage for which is treated differently for performance). Our two
   additions here will be used to indicate whether the particular entry
   is subject to replication admission control. If so, right as we
   persist entries into the raft log storage, we'll admit the work
   without blocking.
   - We'll come back to this non-blocking admission in the
     AdmitRaftEntry description below, even though the implementation is
     left for a future PR.
   - The decision to use replication admission control happens above
     raft, and AC-specific metadata is plumbed down as part of the
     marshaled raft command, as described for RaftAdmissionMeta above.

6. AdmitRaftEntry, on the kvadmission.Controller interface. We'll
   use this as the integration point for log entries received below
   raft, right as they're being written to storage. This will be
   non-blocking since we'll be below raft in the raft.Ready() loop,
   and will effectively enqueue a "virtual" work item in underlying
   StoreWorkQueue mediating store IO. This virtual work item is what
   later gets dequeued once the store granter informs the work queue of
   newly available IO tokens. For standard work queue ordering, our work
   item needs to include the create time and admission pri. The tenant
   ID is plumbed to find the right tenant heap to queue it under (for
   inter-tenant isolation); the store ID to find the right store work
   queue on multi-store nodes. The raftpb.Entry encodes within it its
   origin node (see RaftAdmissionMeta above), which is used
   post-admission to inform the right node of said admission. It looks
   like:

   // AdmitRaftEntry informs admission control of a raft log entry being
   // written to storage.
   AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)

Release note: None
craig bot pushed a commit that referenced this issue Jan 27, 2023
95637: kvflowcontrol,raftlog: interfaces for replication control r=irfansharif a=irfansharif

Follower replication work, today, is not subject to admission control. It consumes IO tokens without waiting, which both (i) does not prevent the LSM from being inverted, and (ii) can cause priority inversion where low-pri follower write work ends up causing IO token exhaustion, which in turn causes throughput and latency impact for high-pri non-follower write work on that same store. This latter behavior was especially noticeble with large index backfills (#82556) where >2/3rds of write traffic on stores could be follower work for large AddSSTs, causing IO token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of #79215, settling on #83851 which pauses replication traffic to stores close to exceeding their IO overload threshold (data that's periodically gossiped). In large index backfill experiments we found this to help slightly, but it's still a coarse and imperfect solution -- we're deliberately causing under-replication instead of being able to shape the rate of incoming writes for low-pri work closer to the origin.

As part of #95563 we're introducing machinery for "replication admission control" -- end-to-end flow control for replication traffic. With it we expect to no longer need to bypass follower write work in admission control and solve the issues mentioned above. Some small degree of familiarity with the design is assumed below. In this first, proto{col,buf}/interface-only PR, we introduce:

1. Package `kvflowcontrol{,pb}`, which will provide flow control for replication traffic in KV. It will be part of the integration layer between KV and admission control. In it we have a few central interfaces:

   - `kvflowcontrol.Controller`, held at the node-level and holds all `flowcontrol.Tokens` for each `flowcontrol.Stream` (one per store we're sending raft traffic to and tenant we're sending it for). 
   - `kvflowcontrol.Handle`, which will held at the replica-level (only on those who are both leaseholder and raft leader), and will be used to interface with the node-level `kvflowcontrol.Controller`. When replicating log entries, these replicas choose the log position (term+index) the data is to end up at, and use this handle to track the token deductions on a per log position basis. Later when freeing up tokens (after being informed of said log entries being admitted on the receiving end of the stream), it's done so by specifying the log position up to which we free up all deducted tokens.
   
```go
type Controller interface {
  Admit(admissionpb.WorkPriority, ...Stream)
  DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
  ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
}

type Handle interface {
  Admit(admissionpb.WorkPriority)
  DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
  ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
  TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
  Close()
}
```
2. `kvflowcontrolpb.RaftAdmissionMeta` and relevant encoding/decoding routines. `RaftAdmissionMeta` is 'embedded' within a `kvserverpb.RaftCommand`, and includes necessary AC metadata on a per raft entry basis. Entries that contain this metadata will make use of the AC-specific raft log entry encodings described earlier. The AC metadata is decoded below-raft when looking to admit the write work. Also included is the node where this command originated, who wants to eventually learn of this command's admission.
```proto
message RaftAdmissionMeta {
  int32 admission_priority = ...;
  int64 admission_create_time = ...;
  int32 admission_origin_node = ...;
}
```
3. `kvflowcontrolpb.AdmittedRaftLogEntries`, which now features in `kvserverpb.RaftMessageRequest`, the unit of what's sent back-and-forth between two nodes over their two uni-directional raft transport streams. `AdmittedRaftLogEntries`, just like raft heartbeats, is coalesced information about all raft log entries that were admitted below raft. We'll use the origin node encoded in raft entry (`admission_origin_node` from 2. from above) to know where to send these to. This information used on the origin node to release flow tokens that were acquired when replicating the original log entries.
```proto
message AdmittedRaftLogEntries {
  int64 range_id = ...;
  int32 admission_priority = ...;
  RaftLogPosition up_to_raft_log_position = ...;
  uint64 store_id = ...;
}

message RaftLogPosition {
  uint64 term = ...;
  uint64 index = ...;
}
```
4. `kvflowcontrol.Dispatch`, which is used to dispatch information about admitted raft log entries (see 3. from above) to specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. The interface is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. The interface looks roughly like:
```go
type Dispatch interface {
  Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
  PendingDispatch() []roachpb.NodeID
  PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
}
```
5. Two new encodings for raft log entries, `EntryEncoding{Standard,Sideloaded}WithAC`. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, `EntryEncoding{Standard,Sideloaded}` (now renamed to `EntryEncoding{Standard,Sideloaded}WithoutAC`), to indicate whether the entry came with sideloaded data (these are typically AddSSTs, the storage for which is treated differently for performance). Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll admit the work without blocking.
    - We'll come back to this non-blocking admission in 7. below, even though the implementation is left for a future PR.
    - The decision to use replication admission control happens above raft, and AC-specific metadata is plumbed down as part of the marshaled raft command, as described in 2. above.

6. An unused version gate (`V23_1UseEncodingWithBelowRaftAdmissionData`) to use replication admission control. Since we're using a different prefix byte for raft commands (point 5. above), one not recognized in earlier CRDB versions, we need explicit versioning.

7. `AdmitRaftEntry`, on the `kvadmission.Controller` interface. We'll use this as the integration point for log entries received below raft, right as they're being written to storage. This will be non-blocking since we'll be below raft in the `raft.Ready()` loop, and will effectively enqueue a "virtual" work item in underlying `StoreWorkQueue` mediating store IO. This virtual work item is what later gets dequeued once the store granter informs the work queue of newly available IO tokens. For standard work queue ordering, our work item needs to include the create time and admission pri. The tenant ID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation); the store ID to find the right store work queue on multi-store nodes. The `raftpb.Entry` encodes within it its origin node (see 2. from above), which is used post-admission to inform the right node of said admission. It looks like:
```go
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
```

---

Here's how the various pieces fit together:

<img width="870" alt="image" src="https://user-images.githubusercontent.com/10536690/214457338-1521c94b-7d9a-4d50-8a29-80984d1a706a.png">



95715: bench: add separate-process tenant benchmarks r=yuzefovich a=yuzefovich

Some time ago we merged a change to run all benchmarks in `pkg/bench` against an in-memory tenant. Recently we introduced a local fastpath for in-process in-memory tenants which will resemble how things will look once we get to UA for single-tenant clusters. However, it is also interesting to measure how we perform with separate-process tenants (which is how we run serverless), so this commit introduces that additional config.

Epic: CRDB-14837

Release note: None

96027: ci: get rid of failures in make-based CI r=healthy-pod a=rickystewart

1. We don't post to GitHub from `make`-based CI jobs any more, and
   `github-post` doesn't build cleanly from `make` any more. I delete
   all the code related to `github-post` from `run_json_test`.
2. Delete `teamcity-check-genfiles.sh` which doesn't serve a purpose now
   that `vendor` is gone.

Epic: none
Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-admission-control A-kv-distribution Relating to rebalancing and leasing. A-kv-replication Relating to Raft, consensus, and coordination. C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) O-sre For issues SRE opened or otherwise cares about tracking.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants