From f29a8e54a1dd5aa0f3fbf65bea921abdccaede48 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Fri, 20 Jan 2023 18:30:18 -0500 Subject: [PATCH] kvflowcontrol,raftlog: interfaces for replication control Follower replication work, today, is not subject to admission control. It consumes IO tokens without waiting, which both (i) does not prevent the LSM from being inverted, and (ii) can cause priority inversion where low-pri follower write work ends up causing IO token exhaustion, which in turn causes throughput and latency impact for high-pri non-follower write work on that same store. This latter behavior was especially noticeble with large index backfills (#82556) where >2/3rds of write traffic on stores could be follower work for large AddSSTs, causing IO token exhaustion for regular write work being proposed on those stores. We last looked at this problem as part of #79215, settling on #83851 which pauses replication traffic to stores close to exceeding their IO overload threshold (data that's periodically gossiped). In large index backfill experiments we found this to help slightly, but it's still a coarse and imperfect solution -- we're deliberately causing under-replication instead of being able to shape the rate of incoming writes for low-pri work closer to the origin. As part of #95563 we're introducing machinery for "replication admission control" -- end-to-end flow control for replication traffic. With it we expect to no longer need to bypass follower write work in admission control and solve the issues mentioned above. Some small degree of familiarity with the design is assumed below. In this first, proto{col,buf}/interface-only PR, we introduce: 1. Package kvflowcontrol{,pb}, which will provide flow control for replication traffic in KV. It will be part of the integration layer between KV and admission control. In it we have a few central interfaces: - kvflowcontrol.Controller, held at the node-level and holds all kvflowcontrol.Tokens for each kvflowcontrol.Stream (one per store we're sending raft traffic to and tenant we're sending it for). - kvflowcontrol.Handle, which will held at the replica-level (only on those who are both leaseholder and raft leader), and will be used to interface with the node-level kvflowcontrol.Controller. When replicating log entries, these replicas choose the log position (term+index) the data is to end up at, and use this handle to track the token deductions on a per log position basis. Later when freeing up tokens (after being informed of said log entries being admitted on the receiving end of the stream), it's done so by specifying the log position up to which we free up all deducted tokens. type Controller interface { Admit(admissionpb.WorkPriority, ...Stream) DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream) ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream) } type Handle interface { Admit(admissionpb.WorkPriority) DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens) ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition) TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition) Close() } 2. kvflowcontrolpb.RaftAdmissionMeta and relevant encoding/decoding routines. RaftAdmissionMeta is 'embedded' within a kvserverpb.RaftCommand, and includes necessary AC metadata on a per raft entry basis. Entries that contain this metadata will make use of the AC-specific raft log entry encodings described earlier. The AC metadata is decoded below-raft when looking to admit the write work. Also included is the node where this command originated, who wants to eventually learn of this command's admission. message RaftAdmissionMeta { int32 admission_priority = ...; int64 admission_create_time = ...; int32 admission_origin_node = ...; } 3. kvflowcontrolpb.AdmittedRaftLogEntries, which now features in kvserverpb.RaftMessageRequest, the unit of what's sent back-and-forth between two nodes over their two uni-directional raft transport streams. AdmittedRaftLogEntries, just like raft heartbeats, is coalesced information about all raft log entries that were admitted below raft. We'll use the origin node encoded in raft entry (admission_origin_node from from above) to know where to send these to. This information used on the origin node to release flow tokens that were acquired when replicating the original log entries. message AdmittedRaftLogEntries { int64 range_id = ...; int32 admission_priority = ...; RaftLogPosition up_to_raft_log_position = ...; uint64 store_id = ...; } message RaftLogPosition { uint64 term = ...; uint64 index = ...; } 4. kvflowcontrol.Dispatch, which is used to dispatch information about admitted raft log entries (see AdmittedRaftLogEntries from above) to specific nodes where (i) said entries originated, (ii) flow tokens were deducted and (iii) are waiting to be returned. The interface is also used to read pending dispatches, which will be used in the raft transport layer when looking to piggyback information on traffic already bound to specific nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we allow querying for all long-overdue dispatches. The interface looks roughly like: type Dispatch interface { Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) PendingDispatch() []roachpb.NodeID PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries } 5. Two new encodings for raft log entries, EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, EntryEncoding{Standard,Sideloaded} (now renamed to EntryEncoding{Standard,Sideloaded}WithoutAC), to indicate whether the entry came with sideloaded data (these are typically AddSSTs, the storage for which is treated differently for performance). Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll admit the work without blocking. - We'll come back to this non-blocking admission in the AdmitRaftEntry section below, even though the implementation is left for a future PR. - The decision to use replication admission control happens above raft, and AC-specific metadata is plumbed down as part of the marshaled raft command, as described for RaftAdmissionMeta above. 6. An unused version gate (V23_1UseEncodingWithBelowRaftAdmissionData) to use replication admission control. Since we're using a different prefix byte for raft commands (see EntryEncodings above), one not recognized in earlier CRDB versions, we need explicit versioning. 7. AdmitRaftEntry, on the kvadmission.Controller interface. We'll use this as the integration point for log entries received below raft, right as they're being written to storage. This will be non-blocking since we'll be below raft in the raft.Ready() loop, and will effectively enqueue a "virtual" work item in underlying StoreWorkQueue mediating store IO. This virtual work item is what later gets dequeued once the store granter informs the work queue of newly available IO tokens. For standard work queue ordering, our work item needs to include the create time and admission pri. The tenant ID is plumbed to find the right tenant heap to queue it under (for inter-tenant isolation); the store ID to find the right store work queue on multi-store nodes. The raftpb.Entry encodes within it its origin node (see RaftAdmissionMeta above), which is used post-admission to inform the right node of said admission. It looks like: // AdmitRaftEntry informs admission control of a raft log entry being // written to storage. AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) --- .github/CODEOWNERS | 3 +- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/BUILD.bazel | 4 + pkg/clusterversion/cockroach_versions.go | 10 ++ pkg/gen/protobuf.bzl | 1 + pkg/kv/kvserver/kvadmission/BUILD.bazel | 1 + pkg/kv/kvserver/kvadmission/kvadmission.go | 25 ++- pkg/kv/kvserver/kvflowcontrol/BUILD.bazel | 19 +++ pkg/kv/kvserver/kvflowcontrol/doc.go | 94 ++++++++++ .../kvserver/kvflowcontrol/kvflowcontrol.go | 125 ++++++++++++++ .../kvflowcontrol/kvflowcontrolpb/BUILD.bazel | 35 ++++ .../kvflowcontrolpb/kvflowcontrol.proto | 118 +++++++++++++ .../kvflowcontrolpb/raft_log_position.go | 42 +++++ pkg/kv/kvserver/kvserverpb/BUILD.bazel | 2 + pkg/kv/kvserver/kvserverpb/proposer_kv.proto | 18 ++ pkg/kv/kvserver/kvserverpb/raft.proto | 5 + pkg/kv/kvserver/logstore/logstore.go | 4 +- .../kvserver/logstore/logstore_bench_test.go | 2 +- pkg/kv/kvserver/logstore/sideload.go | 10 +- pkg/kv/kvserver/logstore/sideload_test.go | 16 +- .../kvserver/loqrecovery/recovery_env_test.go | 2 +- pkg/kv/kvserver/raft.go | 13 +- pkg/kv/kvserver/raftlog/BUILD.bazel | 5 + pkg/kv/kvserver/raftlog/encoding.go | 161 +++++++++++++----- pkg/kv/kvserver/raftlog/encoding_test.go | 107 ++++++++++++ pkg/kv/kvserver/raftlog/entry.go | 39 +++-- pkg/kv/kvserver/raftlog/entry_test.go | 2 +- pkg/kv/kvserver/raftlog/iter_bench_test.go | 18 +- pkg/kv/kvserver/raftlog/iter_test.go | 6 +- pkg/kv/kvserver/replica_proposal_buf_test.go | 2 +- pkg/kv/kvserver/replica_raft.go | 6 +- pkg/kv/kvserver/replica_raft_quiesce.go | 2 +- 33 files changed, 792 insertions(+), 109 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/doc.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto create mode 100644 pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go create mode 100644 pkg/kv/kvserver/raftlog/encoding_test.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cdf2a08dde8a..ef0d89ea7007 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -222,7 +222,8 @@ /pkg/kv/kvserver/gc/ @cockroachdb/kv-prs /pkg/kv/kvserver/idalloc/ @cockroachdb/kv-prs /pkg/kv/kvserver/intentresolver/ @cockroachdb/kv-prs -/pkg/kv/kvserver/kvadmission/ @cockroachdb/kv-prs +/pkg/kv/kvserver/kvadmission/ @cockroachdb/admission-control +/pkg/kv/kvserver/kvflowcontrol/ @cockroachdb/admission-control /pkg/kv/kvserver/kvserverbase/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvserverpb/ @cockroachdb/kv-prs /pkg/kv/kvserver/kvstorage/ @cockroachdb/repl-prs diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 1d256d3e947e..529b3cafc7d2 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 1000022.2-30 set the active cluster version in the format '.' +version version 1000022.2-32 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index bb9b50924c4f..0ef19edc2bac 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -235,6 +235,6 @@
trace.opentelemetry.collector
stringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled
booleantrueif set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector
stringthe address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. -
version
version1000022.2-30set the active cluster version in the format '<major>.<minor>' +
version
version1000022.2-32set the active cluster version in the format '<major>.<minor>' diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 45e60760d079..5d389ed647d6 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1205,6 +1205,8 @@ GO_TARGETS = [ "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", "//pkg/kv/kvserver/kvadmission:kvadmission", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb", + "//pkg/kv/kvserver/kvflowcontrol:kvflowcontrol", "//pkg/kv/kvserver/kvserverbase:kvserverbase", "//pkg/kv/kvserver/kvserverpb:kvserverpb", "//pkg/kv/kvserver/kvstorage:kvstorage", @@ -2587,6 +2589,8 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/idalloc:get_x_data", "//pkg/kv/kvserver/intentresolver:get_x_data", "//pkg/kv/kvserver/kvadmission:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol:get_x_data", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", "//pkg/kv/kvserver/kvstorage:get_x_data", diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 92bb5cc6d747..d6a0cbbbfa49 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -401,6 +401,12 @@ const ( // chagnefeeds created prior to this version. V23_1_ChangefeedExpressionProductionReady + // V23_1UseEncodingWithBelowRaftAdmissionData enables the use of raft + // command encodings that include below-raft admission control data. + // + // TODO(irfansharif): Actually use this. + V23_1UseEncodingWithBelowRaftAdmissionData + // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -687,6 +693,10 @@ var rawVersionsSingleton = keyedVersions{ Key: V23_1_ChangefeedExpressionProductionReady, Version: roachpb.Version{Major: 22, Minor: 2, Internal: 30}, }, + { + Key: V23_1UseEncodingWithBelowRaftAdmissionData, + Version: roachpb.Version{Major: 22, Minor: 2, Internal: 32}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/gen/protobuf.bzl b/pkg/gen/protobuf.bzl index 1407962d7e56..0e381fc0d711 100644 --- a/pkg/gen/protobuf.bzl +++ b/pkg/gen/protobuf.bzl @@ -29,6 +29,7 @@ PROTOBUF_SRCS = [ "//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto", "//pkg/kv/kvserver/concurrency/lock:lock_go_proto", "//pkg/kv/kvserver/concurrency/poison:poison_go_proto", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_go_proto", "//pkg/kv/kvserver/kvserverpb:kvserverpb_go_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_go_proto", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb:loqrecoverypb_go_proto", diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel index 44573b5e43fc..3c8953754572 100644 --- a/pkg/kv/kvserver/kvadmission/BUILD.bazel +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_pebble//:pebble", + "@io_etcd_go_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index c9d4a98b30cb..92fb01f8a832 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "go.etcd.io/raft/v3/raftpb" ) // elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted @@ -81,6 +82,14 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting( true, ) +// ProvisionedBandwidth set a value of the provisioned +// bandwidth for each store in the cluster. +var ProvisionedBandwidth = settings.RegisterByteSizeSetting( + settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", + "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ + "for each store. It can be over-ridden on a per-store basis using the --store flag", + 0).WithPublic() + // Controller provides admission control for the KV layer. type Controller interface { // AdmitKVWork must be called before performing KV work. @@ -108,6 +117,9 @@ type Controller interface { // replicated to a raft follower, that have not been subject to admission // control. FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) + // AdmitRaftEntry informs admission control of a raft log entry being + // written to storage. + AdmitRaftEntry(roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -394,13 +406,12 @@ func (n *controllerImpl) FollowerStoreWriteBytes( followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo) } -// ProvisionedBandwidth set a value of the provisioned -// bandwidth for each store in the cluster. -var ProvisionedBandwidth = settings.RegisterByteSizeSetting( - settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", - "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ - "for each store. It can be over-ridden on a per-store basis using the --store flag", - 0).WithPublic() +// AdmitRaftEntry implements the Controller interface. +func (n *controllerImpl) AdmitRaftEntry( + roachpb.TenantID, roachpb.StoreID, roachpb.RangeID, raftpb.Entry, +) { + panic("unimplemented") +} // FollowerStoreWriteBytes captures stats about writes done to a store by a // replica that is not the leaseholder. These are used for admission control. diff --git a/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel new file mode 100644 index 000000000000..04ef54d4893d --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/BUILD.bazel @@ -0,0 +1,19 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "kvflowcontrol", + srcs = [ + "doc.go", + "kvflowcontrol.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + "//pkg/roachpb", + "//pkg/util/admission/admissionpb", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/doc.go b/pkg/kv/kvserver/kvflowcontrol/doc.go new file mode 100644 index 000000000000..a76a0c3a1a99 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/doc.go @@ -0,0 +1,94 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontrol + +// TODO(irfansharif): After implementing these interfaces and integrating it +// into KV, write a "life of a replicated proposal" timeline here for flow-token +// interactions. Talk about how range splits/merges interact and how we ensure +// now flow tokens are leaked or double returned. Talk also about snapshots, log +// truncations, leader/leaseholder changes, leaseholder != leader, follower +// pausing, re-proposals (the token deduction is tracked for the first attempt), +// lossy raft transport send/recv buffers, and raft membership changing. + +// This package contains machinery for "replication admission control" -- +// end-to-end flow control for replication traffic. It's part of the integration +// layer between KV and admission control. There are a few components, in and +// out of this package. +// +// 1. The central interfaces/types in this package are: +// - kvflowcontrol.Controller, held at the node-level and holds all available +// kvflowcontrol.Tokens for each kvflowcontrol.Stream. +// - kvflowcontrol.Tokens represent the finite capacity of a given stream, +// expressed in bytes we're looking to replicate over the given stream. +// - kvflowcontrol.Stream models the stream over which we replicate data +// traffic, transmission for which we regulate using flow control. It's +// segmented by the specific store the traffic is bound for, and also the +// tenant driving it. +// - kvflowcontrol.Handle is held at the replica-level (only on those who are +// both leaseholder and raft leader), and is used to interface with +// the node-level kvflowcontrol.Controller. When replicating log entries, +// these replicas choose the log position (term+index) the data is to end +// up at, and use this handle to track the token deductions on a per log +// position basis. After being informed of these log entries being admitted +// by the receiving end of the kvflowcontrol.Stream, it frees up the +// tokens. +// +// 2. kvflowcontrolpb.RaftAdmissionMeta, embedded within each +// kvserverpb.RaftCommand, includes all necessary information for below-raft +// IO admission control. Also included is the node where this command +// originated, who wants to eventually learn of this command's admission. +// Entries that contain this metadata make use of AC-specific raft log entry +// encodings described below. +// +// 3. kvflowcontrolpb.AdmittedRaftLogEntries, piggybacked as part of +// kvserverpb.RaftMessageRequest[1], contains coalesced information about all +// raft log entries that were admitted below raft. We use the origin node +// encoded in raft entry (RaftAdmissionMeta.AdmissionOriginNode) to know +// where to send these to. This information used on the origin node to +// release flow tokens that were acquired when replicating the original log +// entries. +// +// 4. kvflowcontrol.Dispatch is used to dispatch information about +// admitted raft log entries (AdmittedRaftLogEntries) to the specific nodes +// where (i) said entries originated, (ii) flow tokens were deducted and +// (iii) are waiting to be returned. The interface is also used to read +// pending dispatches, which will be used in the raft transport layer when +// looking to piggyback information on traffic already bound to specific +// nodes. Since timely dispatching (read: piggybacking) is not guaranteed, we +// allow querying for all long-overdue dispatches. +// +// 5. We use specific encodings for raft log entries that contain AC data: +// EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix +// byte that informs decoding routines how to interpret the subsequent bytes. +// Since we don't want to decode anything if the command is not subject to +// replication admission control, the encoding type is a convenient place to +// capture how a specific entry is to be considered. +// - The decision to use replication admission control happens above raft +// (using cluster settings, version gates), and AC-specific metadata is +// plumbed down as part of the marshaled raft command (RaftAdmissionMeta). +// +// 6. AdmitRaftEntry, on the kvadmission.Controller is the integration +// point for log entries received below raft right as they're being written +// to storage. This is non-blocking since we're below raft in the +// raft.Ready() loop. It effectively enqueues a "virtual" work item in +// underlying StoreWorkQueue mediating store IO. This virtual work item is +// what later gets dequeued once the store granter informs the work queue of +// newly available IO tokens. For standard work queue ordering, our work item +// needs to include the CreateTime and AdmissionPriority. The tenant ID is +// plumbed to find the right tenant heap to queue it under (for inter-tenant +// isolation); the store ID to find the right store work queue on multi-store +// nodes. Since the raftpb.Entry encodes within it its origin node +// (AdmissionOriginNode), it's used post-admission to dispatch to the right +// node. +// +// [1]: kvserverpb.RaftMessageRequest is the unit of what's sent +// back-and-forth between two nodes over their two uni-directional raft +// transport streams. diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go new file mode 100644 index 000000000000..21f30aa6a93a --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go @@ -0,0 +1,125 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package kvflowcontrol provides flow control for replication traffic in KV. +// It's part of the integration layer between KV and admission control. +package kvflowcontrol + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" +) + +// Stream models the stream over which we replicate data traffic, the +// transmission for which we regulate using flow control. It's segmented by the +// specific store the traffic is bound for and the tenant driving it. Despite +// the underlying link/transport being shared across tenants, modeling streams +// on a per-tenant basis helps provide inter-tenant isolation. +type Stream struct { + TenantID roachpb.TenantID + StoreID roachpb.StoreID +} + +// Tokens represent the finite capacity of a given stream, expressed in bytes +// for data we're looking to replicate. Use of replication streams are +// predicated on tokens being available. +type Tokens uint64 + +// Controller provides flow control for replication traffic in KV, held at the +// node-level. +type Controller interface { + // Admit seeks admission to replicate data of a given priority (regardless + // of size) over the specified streams. This is a blocking operation; + // requests wait until there are flow tokens available. + Admit(admissionpb.WorkPriority, ...Stream) + // DeductTokens deducts (without blocking) flow tokens for transmission over + // the given streams, for work with a given priority. Requests are expected + // to have been Admit()-ed first. + DeductTokens(admissionpb.WorkPriority, Tokens, ...Stream) + // ReturnTokens returns flow tokens for the given streams. These tokens are + // expected to have been deducted earlier with a specific priority; that + // same priority is what's specified here. + ReturnTokens(admissionpb.WorkPriority, Tokens, ...Stream) + + // TODO(irfansharif): We might need the ability to "disable" specific + // streams/corresponding token buckets when there are failures or + // replication to a specific store is paused due to follower-pausing. + // That'll have to show up between the Handler and the Controller somehow. +} + +// Handle is used to interface with replication flow control; it's typically +// backed by a node-level Controller. Handles are held on replicas initiating +// replication traffic, i.e. are both the leaseholder and raft leader. When +// replicating log entries, these replicas choose the log position (term+index) +// the data is to end up at, and use this handle to track the token deductions +// on a per log position basis. Later when freeing up tokens (typically after +// being informed of said log entries being admitted on the receiving end of the +// stream), it's done so by specifying the log position up to which we free up +// all deducted tokens. See kvflowcontrolpb.AdmittedRaftLogEntries for more +// details. +type Handle interface { + // Admit seeks admission to replicate data of a given priority (regardless + // of size). This is a blocking operation; requests wait until there are + // flow tokens available. + Admit(admissionpb.WorkPriority) + // DeductTokensFor deducts flow tokens for replicating data of a given + // priority to members of the raft group, and tracks it with respect to the + // specific raft log position it's expecting it to end up in. Requests are + // assumed to have been Admit()-ed first. + DeductTokensFor(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition, Tokens) + // ReturnTokensUpto returns all previously deducted tokens of a given + // priority for all log positions less than or equal to the one specified. + // Once returned, subsequent attempts to return upto the same position or + // lower are no-ops. + ReturnTokensUpto(admissionpb.WorkPriority, kvflowcontrolpb.RaftLogPosition) + // TrackLowWater is used to set a low-water mark for a given replication + // stream. Tokens held below this position are returned back to the + // underlying Controller, regardless of priority. All subsequent returns at + // that position or lower are ignored. + // + // NB: This is used when a replica on the other end of a stream gets caught + // up via snapshot (say, after a log truncation), where we then don't expect + // dispatches for the individual AdmittedRaftLogEntries between what it + // admitted last and its latest RaftLogPosition. Another use is during + // successive lease changes (out and back) within the same raft term -- we + // want to both free up tokens from when we lost the lease, and also ensure + // that attempts to return them (on hearing about AdmittedRaftLogEntries + // replicated under the earlier lease), we discard the attempts. + TrackLowWater(Stream, kvflowcontrolpb.RaftLogPosition) + // Close closes the handle and returns all held tokens back to the + // underlying controller. Typically used when the replica loses its lease + // and/or raft leadership, or ends up getting GC-ed (if it's being + // rebalanced, merged away, etc). + Close() +} + +// Dispatch is used to dispatch information about admitted raft log entries to +// specific nodes and read pending dispatches. +type Dispatch interface { + DispatchWriter + DispatchReader +} + +// DispatchWriter is used to dispatch information about admitted raft log +// entries to specific nodes (typically where said entries originated, where +// flow tokens were deducted and waiting to be returned). +type DispatchWriter interface { + Dispatch(roachpb.NodeID, kvflowcontrolpb.AdmittedRaftLogEntries) +} + +// DispatchReader is used to read pending dispatches. It's used in the raft +// transport layer when looking to piggyback information on traffic already +// bound to specific nodes. It's also used when timely dispatching (read: +// piggybacking) has not taken place. +type DispatchReader interface { + PendingDispatch() []roachpb.NodeID + PendingDispatchFor(roachpb.NodeID) []kvflowcontrolpb.AdmittedRaftLogEntries +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel new file mode 100644 index 000000000000..0a2fd87eda43 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/BUILD.bazel @@ -0,0 +1,35 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@rules_proto//proto:defs.bzl", "proto_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library") + +proto_library( + name = "kvflowcontrolpb_proto", + srcs = ["kvflowcontrol.proto"], + strip_import_prefix = "/pkg", + visibility = ["//visibility:public"], + deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"], +) + +go_proto_library( + name = "kvflowcontrolpb_go_proto", + compilers = ["//pkg/cmd/protoc-gen-gogoroach:protoc-gen-gogoroach_compiler"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + proto = ":kvflowcontrolpb_proto", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", # keep + "@com_github_gogo_protobuf//gogoproto", + ], +) + +go_library( + name = "kvflowcontrolpb", + srcs = ["raft_log_position.go"], + embed = [":kvflowcontrolpb_go_proto"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", + visibility = ["//visibility:public"], + deps = ["@com_github_cockroachdb_redact//:redact"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto new file mode 100644 index 000000000000..39ec002d156d --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto @@ -0,0 +1,118 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +syntax = "proto3"; +package cockroach.kv.kvserver.kvflowcontrol.kvflowcontrolpb; +option go_package = "kvflowcontrolpb"; + +import "gogoproto/gogo.proto"; + +// RaftAdmissionMeta contains information used by admission control for the +// select raft commands that use replication admission control. It contains a +// subset of the fields in RaftCommand to selectively decode state. When +// marshaling a RaftCommand, we willfully include this data in the prefix of the +// marshaled byte buffer. Information about whether this data is present is +// captured in the first byte of the encoded raft proposal -- see +// raftlog.EntryEncoding. +message RaftAdmissionMeta { + // AdmissionPriority of the command (maps to admission.WorkPriority); used + // within a tenant below-raft for replication admission control. + int32 admission_priority = 18; + // AdmissionCreateTime is equivalent to Time.UnixNano() from the creation time + // of the request, or a parent request, for which this command is a part of. + // It's used within a tenant below-raft for replication admission control; see + // admission.WorkInfo.CreateTime for details. + int64 admission_create_time = 19; + // AdmissionOriginNode captures where this raft command originated. It's used + // to inform said node of this raft command's (virtual) admission in order for + // it to release flow tokens for subsequent commands. + int32 admission_origin_node = 20 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + + // TODO(irfansharif): If the {marshaling,unmarshaling} performance overhead + // proves costly, we could: + // - For Admission{Priority,CreateTime}, pack them within a single int64 by + // using 8 bits for the priority (we're using an int8 in Go code) and the + // remaining bits for the create timestamp with lower resolution. + // - For AdmissionOriginNodeID, we could re-work the MultiRaft streaming RPCs + // to include upfront, during stream setup, which node the subsequent + // RaftMessageRequests are coming from. But this awkward to do with our + // current code layering: + // - We want to find out on a per raftpb.Entry level where it came from, and + // to do it once raft.Ready() tells to persist said entry into our raft log. + // - We're currently encoding this data in the raft entry itself, at the + // sender, so it's easy to decode at the right place in + // raft-ready-handling loop. + // - But if we had to "stitch in" the origin node ID once received off of + // the transport, or tie together raft entries with their origin node IDs + // by through some other way (the raft library only wants to "step" + // through message type we can't so easily annotate), we'd have to do a + // fair bit of state tracking. + // If it's still too costly, we could rip all this out and coarsen + // intra-tenant ordering with respect to Admission{Priority,CreateTime}. We + // could instead introduce a WorkQueue-like ordering at the origin where + // requests wait for flow tokens for every it + // intends to write to. Below raft we could live with just side-loaded + // proposals being marked as admissionpb.BulkNormalPri. Origin-side ordering + // would work ok for epoch-LIFO. The coarseness comes from this re-ordering + // only happening on individual origin nodes. + // + // TODO(irfansharif): Get rid of this TODO block after simple performance + // benchmarks (say, `cockroach workload run kv` with high concurrency and + // small write sizes). The ideas above are too complicated. +} + +// AdmittedRaftLogEntries represents a set of raft log entries that were +// admitted below raft. These are identified by: +// - the range ID (there's one per raft group); +// - the admission priority of all said entries; +// - the (inclusive) raft log position up-to-which we've admitted entries; +// - the store ID on which these raft logs were admitted. +// +// This is used as part replication admission control to release, at the origin, +// the specific flow tokens acquired when replicating these log entries along +// this particular "replication stream" (i.e. flowing to a particular store, +// remote or otherwise). +message AdmittedRaftLogEntries { + // RangeID of the raft group these entries belong to. This is the range on + // whose behalf work was admitted. + int64 range_id = 1 [(gogoproto.customname) = "RangeID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeID"]; + + // AdmissionPriority of all admitted entries. + int32 admission_priority = 2; + + // RaftLogPosition (inclusive) of the highest entry that was admitted. Within + // a given priority, admission takes place in raft log order (i.e. entries + // with lower terms get admitted first, or with lower indexes within the same + // term). So the value here implies admission of all entries that sort before + // and have the same priority. + RaftLogPosition raft_log_position = 3 [(gogoproto.nullable) = false]; + + // StoreID on which this raft log entry was admitted. + // + // TODO(irfansharif): We can avoid sending this for every logically admitted + // message if the raft transport stream we were sending it on had some + // handshake protocol at the start, where the client identified itself by its + // NodeID. That way the origin replica receiving this information can infer + // identify the StoreID where this work was done (since we we never store + // multiple replicas of a range on the same node, even if containing multiple + // stores). + uint64 store_id = 4 [(gogoproto.customname) = "StoreID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID"]; +} + +// RaftLogPosition is a point on the raft log, identified by a term and an +// index. +message RaftLogPosition { + option (gogoproto.goproto_stringer) = false; + + uint64 term = 1; + uint64 index = 2; +} diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go new file mode 100644 index 000000000000..c147618928fb --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb/raft_log_position.go @@ -0,0 +1,42 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvflowcontrolpb + +import "github.com/cockroachdb/redact" + +func (p *RaftLogPosition) String() string { + return redact.StringWithoutMarkers(p) +} + +// SafeFormat implements the redact.SafeFormatter interface. +func (p *RaftLogPosition) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("position=%d/%d", p.Term, p.Index) +} + +// Equal returns whether the two raft log positions are identical. +func (p *RaftLogPosition) Equal(o RaftLogPosition) bool { + return p.Term == o.Term && p.Index == o.Index +} + +// Less returns whether the one raft log position is less than the other. Those +// with lower terms sort first, and barring that, those with lower indexes. +func (p *RaftLogPosition) Less(o RaftLogPosition) bool { + if p.Term != o.Term { + return p.Term < o.Term + } + return p.Index < o.Index +} + +// LessEq returns whether one raft log position is less than or equal to the +// other +func (p *RaftLogPosition) LessEq(o RaftLogPosition) bool { + return p.Less(o) || p.Equal(o) +} diff --git a/pkg/kv/kvserver/kvserverpb/BUILD.bazel b/pkg/kv/kvserver/kvserverpb/BUILD.bazel index c7259fa8dc0a..f2bf110a9e7c 100644 --- a/pkg/kv/kvserver/kvserverpb/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverpb/BUILD.bazel @@ -34,6 +34,7 @@ proto_library( strip_import_prefix = "/pkg", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb:kvflowcontrolpb_proto", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb_proto", "//pkg/kv/kvserver/readsummary/rspb:rspb_proto", "//pkg/roachpb:roachpb_proto", @@ -55,6 +56,7 @@ go_proto_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv/kvserver/closedts/ctpb", # keep + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/readsummary/rspb", "//pkg/roachpb", diff --git a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto index eb9e7cbd4c73..7a92a871682d 100644 --- a/pkg/kv/kvserver/kvserverpb/proposer_kv.proto +++ b/pkg/kv/kvserver/kvserverpb/proposer_kv.proto @@ -358,6 +358,24 @@ message RaftCommand { // from" the proposer. map trace_data = 16; + // Fields used below-raft for replication admission control. See + // kvflowcontrolpb.RaftAdmissionMeta for how this data is selectively decoded. + + // AdmissionPriority of the command (maps to admission.WorkPriority); used + // within a tenant below-raft for replication admission control. + int32 admission_priority = 18; + + // AdmissionCreateTime is equivalent to Time.UnixNano() from the creation time + // of the request (or a parent request) for which this command is a part of. + // It's used within a tenant below-raft for replication admission control; see + // admission.WorkInfo.CreateTime for details. + int64 admission_create_time = 19; + + // AdmissionOriginNode captures where this raft command originated. It's used + // to inform said node of this raft command's (virtual) admission in order for + // it to release flow tokens for subsequent commands. + int32 admission_origin_node = 20 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.NodeID"]; + reserved 1, 2, 10001 to 10014; } diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..229af24b873d 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -18,6 +18,7 @@ import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; +import "kv/kvserver/kvflowcontrol/kvflowcontrolpb/kvflowcontrol.proto"; import "raft/v3/raftpb/raft.proto"; import "gogoproto/gogo.proto"; import "util/tracing/tracingpb/recorded_span.proto"; @@ -91,6 +92,10 @@ message RaftMessageRequest { repeated RaftHeartbeat heartbeats = 6 [(gogoproto.nullable) = false]; repeated RaftHeartbeat heartbeat_resps = 7 [(gogoproto.nullable) = false]; + // AdmittedRaftLogEntries is coalesced information about all raft log entries + // that were admitted below raft. + repeated kv.kvserver.kvflowcontrol.kvflowcontrolpb.AdmittedRaftLogEntries admitted_raft_log_entries = 11 [(gogoproto.nullable) = false]; + reserved 10; } diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index e53e9c17f756..f1784431b0f5 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -328,7 +328,7 @@ func LoadTerm( if err != nil { return 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */) } return entry.Term, nil @@ -405,7 +405,7 @@ func LoadEntries( if err != nil { return err } - if typ == raftlog.EntryEncodingSideloaded { + if typ.IsSideloaded() { newEnt, err := MaybeInlineSideloadedRaftCommand( ctx, rangeID, ent, sideloaded, eCache, ) diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index e2e31c257ff8..0c3f8e0473e2 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { Term: 1, Index: 1, Type: raftpb.EntryNormal, - Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data), + Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data), }) stats := &AppendStats{} diff --git a/pkg/kv/kvserver/logstore/sideload.go b/pkg/kv/kvserver/logstore/sideload.go index 9b6f60f0e899..2c21a24fae44 100644 --- a/pkg/kv/kvserver/logstore/sideload.go +++ b/pkg/kv/kvserver/logstore/sideload.go @@ -86,7 +86,7 @@ func MaybeSideloadEntries( if err != nil { return nil, 0, 0, 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { otherEntriesSize += int64(len(input[i].Data)) continue } @@ -124,7 +124,7 @@ func MaybeSideloadEntries( // TODO(tbg): this should be supported by a method as well. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command") @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand( if err != nil { return nil, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return nil, nil } log.Event(ctx, "inlining sideloaded SSTable") @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand( // the EntryEncoding. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, err @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) if err != nil { log.Fatalf(ctx, "%v", err) } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return } diff --git a/pkg/kv/kvserver/logstore/sideload_test.go b/pkg/kv/kvserver/logstore/sideload_test.go index 4d9cc93e36f1..9867e1b20870 100644 --- a/pkg/kv/kvserver/logstore/sideload_test.go +++ b/pkg/kv/kvserver/logstore/sideload_test.go @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) { } func mkEnt( - v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, + enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, ) raftpb.Entry { cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen) var cmd kvserverpb.RaftCommand @@ -62,7 +62,7 @@ func mkEnt( } var ent raftpb.Entry ent.Index, ent.Term = index, term - ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b) + ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b) return ent } @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte + v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC rangeID := roachpb.RangeID(1) type testCase struct { @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { addSSTStripped := addSST addSSTStripped.Data = nil - entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil) - entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST) - entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil) - entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST) - entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped) + entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil) + entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST) + entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil) + entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST) + entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped) type tc struct { name string diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index b0bf5a05cb1a..f90dd1072d91 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -380,7 +380,7 @@ func raftLogFromPendingDescriptorUpdate( t.Fatalf("failed to serialize raftCommand: %v", err) } data := raftlog.EncodeRaftCommand( - raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) + raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) ent := raftpb.Entry{ Term: 1, Index: replica.RaftCommittedIndex + entryIndex, diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 8ba284ad6e06..6e1b6fdb62a1 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string { } // NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods // that call this take care of unwrapping the ConfChange), and since - // len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and - // they are encoded identically. - cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data) + // len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded} + // and they are encoded identically. + cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data) return fmt.Sprintf("[%x] [%d]", cmdID, len(data)) } @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase continue } switch typ { - case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded: - id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data) + case raftlog.EntryEncodingStandardWithAC, + raftlog.EntryEncodingSideloadedWithAC, + raftlog.EntryEncodingStandardWithoutAC, + raftlog.EntryEncodingSideloadedWithoutAC: + id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data) ids = append(ids, id) case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2: // Configuration changes don't have the CmdIDKey easily accessible but are diff --git a/pkg/kv/kvserver/raftlog/BUILD.bazel b/pkg/kv/kvserver/raftlog/BUILD.bazel index 710e68417394..628b21ea93d4 100644 --- a/pkg/kv/kvserver/raftlog/BUILD.bazel +++ b/pkg/kv/kvserver/raftlog/BUILD.bazel @@ -14,6 +14,7 @@ go_library( deps = [ "//pkg/keys", "//pkg/kv/kvserver/apply", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", @@ -29,6 +30,7 @@ go_library( go_test( name = "raftlog_test", srcs = [ + "encoding_test.go", "entry_bench_test.go", "entry_test.go", "iter_bench_test.go", @@ -38,12 +40,15 @@ go_test( embed = [":raftlog"], deps = [ "//pkg/keys", + "//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/admission/admissionpb", "//pkg/util/hlc", + "//pkg/util/humanizeutil", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/protoutil", diff --git a/pkg/kv/kvserver/raftlog/encoding.go b/pkg/kv/kvserver/raftlog/encoding.go index dd52176a0607..d7011b634bf1 100644 --- a/pkg/kv/kvserver/raftlog/encoding.go +++ b/pkg/kv/kvserver/raftlog/encoding.go @@ -13,7 +13,9 @@ package raftlog import ( "fmt" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) // EntryEncoding enumerates the encodings used in CockroachDB for raftpb.Entry's @@ -23,83 +25,148 @@ import ( // and, in some cases, the first byte of the Entry's Data payload. type EntryEncoding byte -// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since -// they aren't on the wire. - const ( - // EntryEncodingStandard is the default encoding for a CockroachDB raft log - // entry. + // EntryEncodingEmpty is an empty entry. These are used by raft after + // leader election. Since they hold no data, there is nothing in them to + // decode. + EntryEncodingEmpty EntryEncoding = iota + // EntryEncodingStandardWithAC is the default encoding for a CockroachDB + // raft log entry. // - // This is a raftpb.Entry of type EntryNormal whose Data slice is either empty - // or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a - // kvserverpb.RaftCommand. - EntryEncodingStandard EntryEncoding = 0 - // EntryEncodingSideloaded indicates a proposal representing the result of a - // roachpb.AddSSTableRequest for which the payload (the SST) is stored outside - // the storage engine to improve storage performance. + // This is a raftpb.Entry of type EntryNormal whose Data slice is either + // empty or whose first byte matches entryEncodingStandardWithACPrefixByte. + // The subsequent eight bytes represent a CmdIDKey. The remaining bytes + // represent a kvserverpb.RaftCommand that also includes below-raft + // admission control metadata. + EntryEncodingStandardWithAC + // EntryEncodingSideloadedWithAC indicates a proposal representing the + // result of a roachpb.AddSSTableRequest for which the payload (the SST) is + // stored outside the storage engine to improve storage performance. // - // This is a raftpb.Entry of type EntryNormal whose data slice is either empty - // or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a + // This is a raftpb.Entry of type EntryNormal whose data slice is either + // empty or whose first byte matches + // entryEncodingSideloadedWithACPrefixByte. The subsequent eight bytes + // represent a CmdIDKey. The remaining bytes represent a // kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a // nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of // which is an SST to be ingested (and which is present in memory but made // durable via direct storage on the filesystem, bypassing the storage - // engine). - EntryEncodingSideloaded EntryEncoding = 1 - // EntryEncodingEmpty is an empty entry. These are used by raft after - // leader election. Since they hold no data, there is nothing in them to - // decode. - EntryEncodingEmpty EntryEncoding = 253 + // engine). The kvserverpb.RaftCommand also includes below-raft admission + // control metadata. + EntryEncodingSideloadedWithAC + // EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but + // without below-raft admission metadata. + EntryEncodingStandardWithoutAC + // EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC + // but without below-raft admission metadata. + EntryEncodingSideloadedWithoutAC // EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is // raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange // whose Context field is a kvserverpb.ConfChangeContext whose Payload is a // kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of // protobuf unmarshaling. - EntryEncodingRaftConfChange EntryEncoding = 254 - // EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with - // the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} - // applied. - EntryEncodingRaftConfChangeV2 EntryEncoding = 255 + EntryEncodingRaftConfChange + // EntryEncodingRaftConfChangeV2 is analogous to + // EntryEncodingRaftConfChange, with the replacements + // raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied. + EntryEncodingRaftConfChangeV2 +) + +// IsSideloaded returns true if the encoding is +// EntryEncodingSideloadedWith{,out}AC. +func (enc EntryEncoding) IsSideloaded() bool { + return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC +} + +// UsesAdmissionControl returns true if the encoding is +// EntryEncoding{Standard,Sideloaded}WithAC. +func (enc EntryEncoding) UsesAdmissionControl() bool { + return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC +} + +// prefixByte returns the prefix byte used during encoding, applicable only to +// EntryEncoding{Standard,Sideloaded}With{,out}AC. +func (enc EntryEncoding) prefixByte() byte { + switch enc { + case EntryEncodingStandardWithAC: + return entryEncodingStandardWithACPrefixByte + case EntryEncodingSideloadedWithAC: + return entryEncodingSideloadedWithACPrefixByte + case EntryEncodingStandardWithoutAC: + return entryEncodingStandardWithoutACPrefixByte + case EntryEncodingSideloadedWithoutAC: + return entryEncodingSideloadedWithoutACPrefixByte + default: + panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc)) + } +} + +const ( + // entryEncodingStandardWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithAC. + entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010 + // entryEncodingSideloadedWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithAC. + entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011 + // entryEncodingStandardWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithoutAC. + entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000 + // entryEncodingSideloadedWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithoutAC. + entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001 ) -// TODO(tbg): when we have a good library for encoding entries, these should -// no longer be exported. const ( // RaftCommandIDLen is the length of a command ID. RaftCommandIDLen = 8 - // RaftCommandPrefixLen is the length of the prefix of raft entries that - // use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The - // bytes after the prefix represent the kvserverpb.RaftCommand. - // + // RaftCommandPrefixLen is the length of the prefix of raft entries that use + // the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes + // after the prefix represent the kvserverpb.RaftCommand. RaftCommandPrefixLen = 1 + RaftCommandIDLen - // EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's - // Data slice for an Entry of encoding EntryEncodingStandard. - EntryEncodingStandardPrefixByte = byte(0) - // EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data - // slice for an Entry of encoding EntryEncodingSideloaded. - EntryEncodingSideloadedPrefixByte = byte(1) ) -// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or -// EntryEncodingSideloaded. -func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte { +// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using +// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte { b := make([]byte, RaftCommandPrefixLen+len(command)) - EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID) + EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID) copy(b[RaftCommandPrefixLen:], command) return b } -// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type -// EntryEncodingStandard or EntryEncodingSideloaded. -func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) { +// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the +// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) { if len(commandID) != RaftCommandIDLen { panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen)) } if len(b) != RaftCommandPrefixLen { panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen)) } - b[0] = prefixByte + b[0] = enc.prefixByte() copy(b[1:], commandID) } + +// DecodeRaftAdmissionMeta decodes admission control metadata from a +// raftpb.Entry.Data. Expects an EntryEncoding{Standard,Sideloaded}WithAC +// encoding. +func DecodeRaftAdmissionMeta(data []byte) (kvflowcontrolpb.RaftAdmissionMeta, error) { + prefix := data[0] + if !(prefix == entryEncodingStandardWithACPrefixByte || prefix == entryEncodingSideloadedWithACPrefixByte) { + panic(fmt.Sprintf("invalid encoding: prefix %v", prefix)) + } + + // TODO(irfansharif): If the decoding overhead is noticeable, we can write a + // custom decoder and rely on the encoding for raft admission data being + // present at the start of the marshaled raft command. This could speed it + // up slightly. + var raftAdmissionMeta kvflowcontrolpb.RaftAdmissionMeta + if err := protoutil.Unmarshal(data[1+RaftCommandIDLen:], &raftAdmissionMeta); err != nil { + return kvflowcontrolpb.RaftAdmissionMeta{}, err + } + return raftAdmissionMeta, nil +} diff --git a/pkg/kv/kvserver/raftlog/encoding_test.go b/pkg/kv/kvserver/raftlog/encoding_test.go new file mode 100644 index 000000000000..0e5734d9d5bd --- /dev/null +++ b/pkg/kv/kvserver/raftlog/encoding_test.go @@ -0,0 +1,107 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package raftlog + +import ( + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" + "go.etcd.io/raft/v3/raftpb" +) + +// BenchmarkRaftAdmissionMetaOverhead measures the overhead of encoding/decoding +// raft metadata, compared to not doing it. It's structured similar to how raft +// command data is encoded + decoded end-to-end, including the optional +// below-raft admission control data, where steps (2) and (4) below are only +// done for proposals subject to below-raft admission. +// +// name old time/op new time/op delta +// RaftAdmissionMetaOverhead/bytes=1.0_KiB,raft-ac-10 1.30µs ± 1% 1.70µs ± 1% +30.43% (p=0.008 n=5+5) +// RaftAdmissionMetaOverhead/bytes=256_KiB,raft-ac-10 51.6µs ± 4% 50.6µs ± 5% ~ (p=0.421 n=5+5) +// RaftAdmissionMetaOverhead/bytes=512_KiB,raft-ac-10 91.9µs ± 4% 91.2µs ± 5% ~ (p=1.000 n=5+5) +// RaftAdmissionMetaOverhead/bytes=1.0_MiB,raft-ac-10 148µs ± 4% 151µs ± 5% ~ (p=0.095 n=5+5) +// RaftAdmissionMetaOverhead/bytes=2.0_MiB,raft-ac-10 290µs ± 3% 292µs ± 1% ~ (p=0.151 n=5+5) +func BenchmarkRaftAdmissionMetaOverhead(b *testing.B) { + defer log.Scope(b).Close(b) + + const KiB = 1 << 10 + const MiB = 1 << 20 + + for _, withRaftAdmissionMeta := range []bool{false, true} { + for _, bytes := range []int64{1 * KiB, 256 * KiB, 512 * KiB, 1 * MiB, 2 * MiB} { + var raftAdmissionMetaLen int + var raftAdmissionMeta *kvflowcontrolpb.RaftAdmissionMeta + entryEnc := EntryEncodingStandardWithoutAC + + raftCmd := mkRaftCommand(100, int(bytes), int(bytes+200)) + marshaledRaftCmd, err := protoutil.Marshal(raftCmd) + require.NoError(b, err) + + if withRaftAdmissionMeta { + raftAdmissionMeta = &kvflowcontrolpb.RaftAdmissionMeta{ + AdmissionPriority: int32(admissionpb.BulkNormalPri), + AdmissionCreateTime: 18581258253, + } + raftAdmissionMetaLen = raftAdmissionMeta.Size() + entryEnc = EntryEncodingStandardWithAC + } + + encodingBuf := make([]byte, RaftCommandPrefixLen+raftAdmissionMeta.Size()+len(marshaledRaftCmd)) + raftEnt := Entry{ + Entry: raftpb.Entry{ + Term: 1, + Index: 1, + Type: raftpb.EntryNormal, + Data: encodingBuf, + }, + } + + b.Run(fmt.Sprintf("bytes=%s,raft-ac=%t", humanizeutil.IBytes(bytes), withRaftAdmissionMeta), + func(b *testing.B) { + for i := 0; i < b.N; i++ { + // 1. Encode the raft command prefix. + EncodeRaftCommandPrefix(encodingBuf[:RaftCommandPrefixLen], entryEnc, "deadbeef") + + // 2. If using below-raft admission, encode the raft + // metadata right after the command prefix. + if withRaftAdmissionMeta { + _, err = protoutil.MarshalTo( + raftAdmissionMeta, + encodingBuf[RaftCommandPrefixLen:RaftCommandPrefixLen+raftAdmissionMetaLen], + ) + require.NoError(b, err) + } + + // 3. Marshal the rest of the command. + _, err = protoutil.MarshalTo(raftCmd, encodingBuf[RaftCommandPrefixLen+raftAdmissionMetaLen:]) + require.NoError(b, err) + + // 4. If using below-raft admission, decode the raft + // metadata. + if withRaftAdmissionMeta { + _, err = DecodeRaftAdmissionMeta(encodingBuf) + require.NoError(b, err) + } + + // 5. Decode the entire raft command. + require.NoError(b, raftEnt.load()) + } + }, + ) + } + } +} diff --git a/pkg/kv/kvserver/raftlog/entry.go b/pkg/kv/kvserver/raftlog/entry.go index d0d04ba2378d..f05d10baa3dd 100644 --- a/pkg/kv/kvserver/raftlog/entry.go +++ b/pkg/kv/kvserver/raftlog/entry.go @@ -36,36 +36,40 @@ func EncodingOf(ent raftpb.Entry) (EntryEncoding, error) { } switch ent.Type { - case raftpb.EntryNormal: case raftpb.EntryConfChange: return EntryEncodingRaftConfChange, nil case raftpb.EntryConfChangeV2: return EntryEncodingRaftConfChangeV2, nil + case raftpb.EntryNormal: default: return 0, errors.AssertionFailedf("unknown EntryType %d", ent.Type) } switch ent.Data[0] { - case EntryEncodingStandardPrefixByte: - return EntryEncodingStandard, nil - case EntryEncodingSideloadedPrefixByte: - return EntryEncodingSideloaded, nil + case entryEncodingStandardWithACPrefixByte: + return EntryEncodingStandardWithAC, nil + case entryEncodingSideloadedWithACPrefixByte: + return EntryEncodingSideloadedWithAC, nil + case entryEncodingStandardWithoutACPrefixByte: + return EntryEncodingStandardWithoutAC, nil + case entryEncodingSideloadedWithoutACPrefixByte: + return EntryEncodingSideloadedWithoutAC, nil default: return 0, errors.AssertionFailedf("unknown command encoding version %d", ent.Data[0]) } } -// DecomposeRaftVersionStandardOrSideloaded extracts the CmdIDKey and the -// marshaled kvserverpb.RaftCommand from a slice which is known to have come -// from a raftpb.Entry of type raftlog.EntryEncodingStandard or -// raftlog.EntryEncodingSideloaded (which, mod the prefix byte, share an -// encoding). -func DecomposeRaftVersionStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { +// DecomposeRaftEncodingStandardOrSideloaded extracts the CmdIDKey and the +// marshaled kvserverpb.RaftCommand from a raftpb.Entry slice known to have +// Entry with type EntryEncoding{Standard,Sideloaded}With{,out}AC. +// All these variants, mod the prefix byte, share an encoding. +func DecomposeRaftEncodingStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { return kvserverbase.CmdIDKey(data[1 : 1+RaftCommandIDLen]), data[1+RaftCommandIDLen:] } // Entry contains data related to a raft log entry. This is the raftpb.Entry -// itself but also all encapsulated data relevant for command application. +// itself but also all encapsulated data relevant for command application and +// admission control. type Entry struct { raftpb.Entry ID kvserverbase.CmdIDKey // may be empty for zero Entry @@ -73,6 +77,10 @@ type Entry struct { ConfChangeV1 *raftpb.ConfChange // only set for config change ConfChangeV2 *raftpb.ConfChangeV2 // only set for config change ConfChangeContext *kvserverpb.ConfChangeContext // only set for config change + // ApplyAdmissionControl determines whether this entry is subject to + // replication admission control. Only applies for entries with encoding + // EntryEncoding{Standard,Sideloaded}WithAC. + ApplyAdmissionControl bool } var entryPool = sync.Pool{ @@ -145,8 +153,11 @@ func (e *Entry) load() error { AsV2() raftpb.ConfChangeV2 } switch typ { - case EntryEncodingStandard, EntryEncodingSideloaded: - e.ID, raftCmdBytes = DecomposeRaftVersionStandardOrSideloaded(e.Entry.Data) + case EntryEncodingStandardWithAC, EntryEncodingSideloadedWithAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) + e.ApplyAdmissionControl = true + case EntryEncodingStandardWithoutAC, EntryEncodingSideloadedWithoutAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) case EntryEncodingEmpty: // Nothing to load, the empty raftpb.Entry is represented by a trivial // Entry. diff --git a/pkg/kv/kvserver/raftlog/entry_test.go b/pkg/kv/kvserver/raftlog/entry_test.go index f7ac225e81c3..6893d4b3541c 100644 --- a/pkg/kv/kvserver/raftlog/entry_test.go +++ b/pkg/kv/kvserver/raftlog/entry_test.go @@ -25,7 +25,7 @@ func TestLoadInvalidEntry(t *testing.T) { Data: EncodeRaftCommand( // It would be nice to have an "even more invalid" command here but it // turns out that DecodeRaftCommand "handles" errors via panic(). - EntryEncodingStandardPrefixByte, "foobarzz", []byte("definitely not a protobuf"), + EntryEncodingStandardWithAC, "foobarzz", []byte("definitely not a protobuf"), ), } ent, err := NewEntry(invalidEnt) diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index 722510919d8f..adc0850b9c37 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -59,10 +59,9 @@ func (m *mockReader) NewMVCCIterator( return m.iter } -func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { +func mkRaftCommand(keySize, valSize, writeBatchSize int) *kvserverpb.RaftCommand { r := rand.New(rand.NewSource(123)) - // A realistic-ish raft command for a ~1kb write. - cmd := &kvserverpb.RaftCommand{ + return &kvserverpb.RaftCommand{ ProposerLeaseSequence: 1, MaxLeaseIndex: 1159192591, ClosedTimestamp: &hlc.Timestamp{WallTime: 12512591925, Logical: 1}, @@ -79,20 +78,25 @@ func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { }, RaftLogDelta: 1300, }, - WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, 2000)}, + WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, writeBatchSize)}, LogicalOpLog: &kvserverpb.LogicalOpLog{Ops: []enginepb.MVCCLogicalOp{ { WriteValue: &enginepb.MVCCWriteValueOp{ - Key: roachpb.Key(randutil.RandBytes(r, 100)), + Key: roachpb.Key(randutil.RandBytes(r, keySize)), Timestamp: hlc.Timestamp{WallTime: 1284581285}, - Value: roachpb.Key(randutil.RandBytes(r, 1800)), + Value: roachpb.Key(randutil.RandBytes(r, valSize)), }, }, }}, } +} + +func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { + // A realistic-ish raft command for a ~1kb write. + cmd := mkRaftCommand(100, 1800, 2000) cmdB, err := protoutil.Marshal(cmd) require.NoError(b, err) - data := EncodeRaftCommand(EntryEncodingStandardPrefixByte, "cmd12345", cmdB) + data := EncodeRaftCommand(EntryEncodingStandardWithoutAC, "cmd12345", cmdB) ent := raftpb.Entry{ Term: 1, diff --git a/pkg/kv/kvserver/raftlog/iter_test.go b/pkg/kv/kvserver/raftlog/iter_test.go index 189c4f965f95..6d372e654885 100644 --- a/pkg/kv/kvserver/raftlog/iter_test.go +++ b/pkg/kv/kvserver/raftlog/iter_test.go @@ -44,11 +44,11 @@ func ents(inds ...uint64) []raftpb.Entry { typ := raftpb.EntryType(ind % 3) switch typ { case raftpb.EntryNormal: - prefixByte := EntryEncodingStandardPrefixByte + enc := EntryEncodingStandardWithAC if ind%2 == 0 { - prefixByte = EntryEncodingSideloadedPrefixByte + enc = EntryEncodingSideloadedWithAC } - data = EncodeRaftCommand(prefixByte, cmdID, b) + data = EncodeRaftCommand(enc, cmdID, b) case raftpb.EntryConfChangeV2: c := kvserverpb.ConfChangeContext{ CommandID: string(cmdID), diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 05be5ad0d3da..5b872d6e2771 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -303,7 +303,7 @@ func (pc proposalCreator) encodeProposal(p *ProposalData) []byte { cmdLen := p.command.Size() needed := raftlog.RaftCommandPrefixLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize() data := make([]byte, raftlog.RaftCommandPrefixLen, needed) - raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardWithoutAC, p.idKey) data = data[:raftlog.RaftCommandPrefixLen+p.command.Size()] if _, err := protoutil.MarshalTo(p.command, data[raftlog.RaftCommandPrefixLen:]); err != nil { panic(err) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index e6f8b4369a93..690c2e72445b 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -344,7 +344,7 @@ func (r *Replica) propose( // Determine the encoding style for the Raft command. prefix := true - encodingPrefixByte := raftlog.EntryEncodingStandardPrefixByte + entryEncoding := raftlog.EntryEncodingStandardWithoutAC if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil { // EndTxnRequest with a ChangeReplicasTrigger is special because Raft // needs to understand it; it cannot simply be an opaque command. To @@ -416,7 +416,7 @@ func (r *Replica) propose( } } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") - encodingPrefixByte = raftlog.EntryEncodingSideloadedPrefixByte + entryEncoding = raftlog.EntryEncodingSideloadedWithoutAC r.store.metrics.AddSSTableProposals.Inc(1) if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { @@ -438,7 +438,7 @@ func (r *Replica) propose( data := make([]byte, preLen, needed) // Encode prefix with command ID, if necessary. if prefix { - raftlog.EncodeRaftCommandPrefix(data, encodingPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, entryEncoding, p.idKey) } // Encode body of command. data = data[:preLen+cmdLen] diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..147ee7447721 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -87,7 +87,7 @@ func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool { r.store.unquiescedReplicas.Unlock() r.maybeCampaignOnWakeLocked(ctx) // Propose an empty command which will wake the leader. - data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, makeIDKey(), nil) + data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil) _ = r.mu.internalRaftGroup.Propose(data) return true }