Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
95637: kvflowcontrol,raftlog: interfaces for replication control r=irfansharif a=irfansharif

Follower replication work, today, is not subject to admission control. It consumes IO tokens without waiting, which both (i) does not prevent the LSM from being inverted, and (ii) can cause priority inversion where low-pri follower write work ends up causing IO token exhaustion, which in turn causes throughput and latency impact for high-pri non-follower write work on that same store. This latter behavior was especially noticeble with large index backfills (#82556) where >2/3rds of write traffic on stores could be follower work for large AddSSTs, causing IO token exhaustion for regular write work being proposed on those stores.

We last looked at this problem as part of #79215, settling on #83851 which pauses replication traffic to stores close to exceeding their IO overload threshold (data that's periodically gossiped). In large index backfill experiments we found this to help slightly, but it's still a coarse and imperfect solution -- we're deliberately causing under-replication instead of being able to shape the rate of incoming writes for low-pri work closer to the origin.

As part of #95563 we're introducing machinery for "replication admission control" -- end-to-end flow control for replication traffic. With it we expect to no longer need to bypass follower write work in admission control and solve the issues mentioned above. Some small degree of familiarity with the design is assumed below. In this first, proto{col,buf}/interface-only PR, we introduce:

1. Package `kvflowcontrol{,pb}`, which will provide flow control for replication traffic in KV. It will be part of the integration layer between KV and admission control. In it we have a few central interfaces:

   - `kvflowcontrol.Controller`, held at the node-level and holds all `flowcontrol.Tokens` for each `flowcontrol.Stream` (one per store we're sending raft traffic to and tenant we're sending it for). 
   - `kvflowcontrol.Handle`, which will held at the replica-level (only on those who are both leaseholder and raft leader), and will be used to interface with the node-level `kvflowcontrol.Controller`. When replicating log entries, these replicas choose the log position (term+index) the data is to end up at, and use this handle to track the token deductions on a per log position basis. Later when freeing up tokens (after being informed of said log entries being admitted on the receiving end of the stream), it's done so by specifying the log position up to which we free up all deducted tokens.
   
```go
type Controller interface {
  Admit(admissionpb.WorkPriority, ...Stream)
  DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream)
  ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream)
}

type Handle interface {
  Admit(admissionpb.WorkPriority)
  DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens)
  ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition)
  TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition)
  Close()
}
```
2. `kvflowcontrolpb.RaftAdmissionMeta` and relevant encoding/decoding routines. `RaftAdmissionMeta` is 'embedded' within a `kvserverpb.RaftCommand`, and includes necessary AC metadata on a per raft entry basis. Entries that contain this metadata will make use of the AC-specific raft log entry encodings described earlier. The AC metadata is decoded below-raft when looking to admit the write work. Also included is the node where this command originated, who wants to eventually learn of this command's admission.
```proto
message RaftAdmissionMeta {
  int32 admission_priority = ...;
  int64 admission_create_time = ...;
  int32 admission_origin_node = ...;
}
```
3. `kvflowcontrolpb.AdmittedRaftLogEntries`, which now features in `kvserverpb.RaftMessageRequest`, the unit of what's sent back-and-forth between two nodes over their two uni-directional raft transport streams. `AdmittedRaftLogEntries`, just like raft heartbeats, is coalesced information about all raft log entries that were admitted below raft. We'll use the origin node encoded in raft entry (`admission_origin_node` from 2. from above) to know where to send these to. This information used on the origin node to release flow tokens that were acquired when replicating the original log entries.
```proto
message AdmittedRaftLogEntries {
  int64 range_id = ...;
  int32 admission_priority = ...;
  RaftLogPosition up_to_raft_log_position = ...;
  uint64 store_id = ...;
}

message RaftLogPosition {
  uint64 term = ...;
  uint64 index = ...;
}
```
4. `kvflowcontrol.Dispatch`, which is used to dispatch information about admitted raft log entries (see 3. from above) to specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. The interface is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. The interface looks roughly like:
```go
type Dispatch interface {
  Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries)
  PendingDispatch() []roachpb.NodeID
  PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries
}
```
5. Two new encodings for raft log entries, `EntryEncoding{Standard,Sideloaded}WithAC`. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, `EntryEncoding{Standard,Sideloaded}` (now renamed to `EntryEncoding{Standard,Sideloaded}WithoutAC`), to indicate whether the entry came with sideloaded data (these are typically AddSSTs, the storage for which is treated differently for performance). Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll admit the work without blocking.
    - We'll come back to this non-blocking admission in 7. below, even though the implementation is left for a future PR.
    - The decision to use replication admission control happens above raft, and AC-specific metadata is plumbed down as part of the marshaled raft command, as described in 2. above.

6. An unused version gate (`V23_1UseEncodingWithBelowRaftAdmissionData`) to use replication admission control. Since we're using a different prefix byte for raft commands (point 5. above), one not recognized in earlier CRDB versions, we need explicit versioning.

7. `AdmitRaftEntry`, on the `kvadmission.Controller` interface. We'll use this as the integration point for log entries received below raft, right as they're being written to storage. This will be non-blocking since we'll be below raft in the `raft.Ready()` loop, and will effectively enqueue a "virtual" work item in underlying `StoreWorkQueue` mediating store IO. This virtual work item is what later gets dequeued once the store granter informs the work queue of newly available IO tokens. For standard work queue ordering, our work item needs to include the create time and admission pri. The tenant ID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation); the store ID to find the right store work queue on multi-store nodes. The `raftpb.Entry` encodes within it its origin node (see 2. from above), which is used post-admission to inform the right node of said admission. It looks like:
```go
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
```

---

Here's how the various pieces fit together:

<img width="870" alt="image" src="https://user-images.githubusercontent.com/10536690/214457338-1521c94b-7d9a-4d50-8a29-80984d1a706a.png">



95715: bench: add separate-process tenant benchmarks r=yuzefovich a=yuzefovich

Some time ago we merged a change to run all benchmarks in `pkg/bench` against an in-memory tenant. Recently we introduced a local fastpath for in-process in-memory tenants which will resemble how things will look once we get to UA for single-tenant clusters. However, it is also interesting to measure how we perform with separate-process tenants (which is how we run serverless), so this commit introduces that additional config.

Epic: CRDB-14837

Release note: None

96027: ci: get rid of failures in make-based CI r=healthy-pod a=rickystewart

1. We don't post to GitHub from `make`-based CI jobs any more, and
   `github-post` doesn't build cleanly from `make` any more. I delete
   all the code related to `github-post` from `run_json_test`.
2. Delete `teamcity-check-genfiles.sh` which doesn't serve a purpose now
   that `vendor` is gone.

Epic: none
Release note: None

Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
  • Loading branch information
4 people committed Jan 27, 2023
4 parents 67d7728 + 2cac11b + 01a70b0 + 71d4a3f commit 0b49fa7
Show file tree
Hide file tree
Showing 30 changed files with 1,042 additions and 175 deletions.
3 changes: 2 additions & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@
/pkg/kv/kvserver/gc/ @cockroachdb/kv-prs
/pkg/kv/kvserver/idalloc/ @cockroachdb/kv-prs
/pkg/kv/kvserver/intentresolver/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvadmission/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvadmission/ @cockroachdb/admission-control
/pkg/kv/kvserver/kvflowcontrol/ @cockroachdb/admission-control
/pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs
/pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs
Expand Down
50 changes: 0 additions & 50 deletions build/teamcity-check-genfiles.sh

This file was deleted.

25 changes: 0 additions & 25 deletions build/teamcity-support.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ run_counter=-1
function run_json_test() {
run_counter=$((run_counter+1))
tc_start_block "prep"
# TODO(tbg): better to go through builder for all of this.
go install github.com/cockroachdb/cockroach/pkg/cmd/github-post
mkdir -p artifacts
tmpfile="artifacts/raw.${run_counter}.json.txt"
tc_end_block "prep"
Expand All @@ -60,29 +58,6 @@ function run_json_test() {
set -e
tc_end_block "run"

# Post issues, if on a release branch. Note that we're feeding github-post all
# of the build output; it also does some slow test analysis.
if tc_release_branch; then
if [ -z "${GITHUB_API_TOKEN-}" ]; then
# GITHUB_API_TOKEN must be in the env or github-post will barf if it's
# ever asked to post, so enforce that on all runs.
# The way this env var is made available here is quite tricky. The build
# calling this method is usually a build that is invoked from PRs, so it
# can't have secrets available to it (for the PR could modify
# build/teamcity-* to leak the secret). Instead, we provide the secrets
# to a higher-level job (Publish Bleeding Edge) and use TeamCity magic to
# pass that env var through when it's there. This means we won't have the
# env var on PR builds, but we'll have it for builds that are triggered
# from the release branches.
echo "GITHUB_API_TOKEN must be set"
exit 1
else
tc_start_block "post issues"
github-post < "${tmpfile}"
tc_end_block "post issues"
fi
fi

tc_start_block "artifacts"
# Create (or append to) failures.txt artifact and delete stripped.txt.
(cd "$root"/pkg/cmd/testfilter && go run main.go -mode=omit) < artifacts/stripped.txt | \
Expand Down
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,8 @@ GO_TARGETS = [
"//pkg/kv/kvserver/intentresolver:intentresolver",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvadmission:kvadmission",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/kvstorage:kvstorage",
Expand Down Expand Up @@ -2607,6 +2609,8 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/idalloc:get_x_data",
"//pkg/kv/kvserver/intentresolver:get_x_data",
"//pkg/kv/kvserver/kvadmission:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol:get_x_data",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/kvstorage:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/bench/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/roachpb",
"//pkg/server",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
Expand Down
41 changes: 37 additions & 4 deletions pkg/bench/foreachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -51,9 +52,10 @@ func benchmarkCockroach(b *testing.B, f BenchmarkFn) {
f(b, sqlutils.MakeSQLRunner(db))
}

// benchmarkTenantCockroach runs the benchmark against an in-memory tenant in a
// single-node cluster.
func benchmarkTenantCockroach(b *testing.B, f BenchmarkFn) {
// benchmarkInProcessTenantCockroach runs the benchmark against an in-memory
// tenant in a single-node cluster. The tenant runs in the same process as the
// KV host.
func benchmarkInProcessTenantCockroach(b *testing.B, f BenchmarkFn) {
ctx := context.Background()
s, db, _ := serverutils.StartServer(
b, base.TestServerArgs{
Expand Down Expand Up @@ -82,6 +84,36 @@ func benchmarkTenantCockroach(b *testing.B, f BenchmarkFn) {
f(b, sqlutils.MakeSQLRunner(tenantDB))
}

// benchmarkSepProcessTenantCockroach runs the benchmark against a tenant with a
// single SQL pod and a single-node KV host cluster. The tenant runs in a
// separate process from the KV host.
func benchmarkSepProcessTenantCockroach(b *testing.B, f BenchmarkFn) {
ctx := context.Background()
s, db, _ := serverutils.StartServer(
b, base.TestServerArgs{
DisableDefaultTestTenant: true,
})
defer s.Stopper().Stop(ctx)

// Create our own test tenant with a known name.
_, tenantDB := serverutils.StartTenant(b, s, base.TestTenantArgs{
TenantName: "benchtenant",
TenantID: roachpb.MustMakeTenantID(10),
UseDatabase: "bench",
})

// The benchmarks sometime hit the default span limit, so we increase it.
// NOTE(andrei): Benchmarks drop the tables they're creating, so I'm not sure
// if hitting this limit is expected.
_, err := db.Exec(`ALTER TENANT ALL SET CLUSTER SETTING "spanconfig.tenant_limit" = 10000000`)
require.NoError(b, err)

_, err = tenantDB.Exec(`CREATE DATABASE bench`)
require.NoError(b, err)

f(b, sqlutils.MakeSQLRunner(tenantDB))
}

func benchmarkMultinodeCockroach(b *testing.B, f BenchmarkFn) {
tc := testcluster.StartTestCluster(b, 3,
base.TestClusterArgs{
Expand Down Expand Up @@ -171,7 +203,8 @@ func benchmarkMySQL(b *testing.B, f BenchmarkFn) {
func ForEachDB(b *testing.B, fn BenchmarkFn) {
for _, dbFn := range []func(*testing.B, BenchmarkFn){
benchmarkCockroach,
benchmarkTenantCockroach,
benchmarkInProcessTenantCockroach,
benchmarkSepProcessTenantCockroach,
benchmarkMultinodeCockroach,
benchmarkPostgres,
benchmarkMySQL,
Expand Down
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ PROTOBUF_SRCS = [
"//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto",
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
"//pkg/kv/kvserver/concurrency/poison:poison_go_proto",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto",
"//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto",
"//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvadmission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_pebble//:pebble",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
25 changes: 18 additions & 7 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"go.etcd.io/raft/v3/raftpb"
)

// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted
Expand Down Expand Up @@ -81,6 +82,14 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting(
true,
)

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()

// Controller provides admission control for the KV layer.
type Controller interface {
// AdmitKVWork must be called before performing KV work.
Expand Down Expand Up @@ -108,6 +117,9 @@ type Controller interface {
// replicated to a raft follower, that have not been subject to admission
// control.
FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes)
// AdmitRaftEntry informs admission control of a raft log entry being
// written to storage.
AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry)
}

// TenantWeightProvider can be periodically asked to provide the tenant
Expand Down Expand Up @@ -394,13 +406,12 @@ func (n *controllerImpl) FollowerStoreWriteBytes(
followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo)
}

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
"for each store. It can be over-ridden on a per-store basis using the --store flag",
0).WithPublic()
// AdmitRaftEntry implements the Controller interface.
func (n *controllerImpl) AdmitRaftEntry(
roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry,
) {
panic("unimplemented")
}

// FollowerStoreWriteBytes captures stats about writes done to a store by a
// replica that is not the leaseholder. These are used for admission control.
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/kvflowcontrol/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "kvflowcontrol",
srcs = [
"doc.go",
"kvflowcontrol.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit 0b49fa7

Please sign in to comment.