From de5fbabd804b729b9a544244630a3aeaac72e889 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Wed, 30 Nov 2022 16:35:55 -0500 Subject: [PATCH 1/2] insights: record insights when txn contention meets threshold Part of #90393 Previously, we only write a transaction and its statements to the insights system when there is an issue detected for one of the statements in the transaction. However, a transaction should still report high contention when the total amount of contention time is high, even if the contention experienced by each of its statements is not over the threshold. This commit ensures that transactions which have a recorded contention time above the insights latency threshold get reported to the insights system. Release note: None --- pkg/sql/sqlstats/insights/registry.go | 18 +++++++++- pkg/sql/sqlstats/insights/registry_test.go | 42 ++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/pkg/sql/sqlstats/insights/registry.go b/pkg/sql/sqlstats/insights/registry.go index bcc0bff965d1..9df960f5caed 100644 --- a/pkg/sql/sqlstats/insights/registry.go +++ b/pkg/sql/sqlstats/insights/registry.go @@ -103,13 +103,29 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac slowStatements.Add(i) } } - if slowStatements.Empty() { + + // So far this is the only case when a transaction is considered slow. + // In the future, we may want to make a detector for transactions if there + // are more cases. + highContention := false + if transaction.Contention != nil { + highContention = transaction.Contention.Seconds() >= LatencyThreshold.Get(&r.causes.st.SV).Seconds() + } + + if slowStatements.Empty() && !highContention { + // We only record an insight if we have slow statements or high txn contention. return } + // Note that we'll record insights for every statement, not just for // the slow ones. insight := makeInsight(sessionID, transaction) + if highContention { + insight.Transaction.Problems = addProblem(insight.Transaction.Problems, Problem_SlowExecution) + insight.Transaction.Causes = addCause(insight.Transaction.Causes, Cause_HighContention) + } + for i, s := range *statements { if slowStatements.Contains(i) { switch s.Status { diff --git a/pkg/sql/sqlstats/insights/registry_test.go b/pkg/sql/sqlstats/insights/registry_test.go index e11519c7005b..573888699024 100644 --- a/pkg/sql/sqlstats/insights/registry_test.go +++ b/pkg/sql/sqlstats/insights/registry_test.go @@ -254,4 +254,46 @@ func TestRegistry(t *testing.T) { registry := newRegistry(st, &latencyThresholdDetector{st: st}, newStore(st)) require.NotPanics(t, func() { registry.ObserveTransaction(session.ID, transaction) }) }) + + t.Run("txn with high accumulated contention without high single stmt contention", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + store := newStore(st) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + contentionDuration := 10 * time.Second + statement := &Statement{ + Status: Statement_Completed, + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), + FingerprintID: roachpb.StmtFingerprintID(100), + LatencyInSeconds: 0.00001, + } + txnHighContention := &Transaction{ID: uuid.FastMakeV4(), Contention: &contentionDuration} + + registry.ObserveStatement(session.ID, statement) + registry.ObserveTransaction(session.ID, txnHighContention) + + expected := []*Insight{ + { + Session: session, + Transaction: &Transaction{ + ID: txnHighContention.ID, + Contention: &contentionDuration, + StmtExecutionIDs: txnHighContention.StmtExecutionIDs, + Problems: []Problem{Problem_SlowExecution}, + Causes: []Cause{Cause_HighContention}}, + Statements: []*Statement{ + newStmtWithProblemAndCauses(statement, Problem_None, nil), + }, + }, + } + + var actual []*Insight + store.IterateInsights( + context.Background(), + func(ctx context.Context, o *Insight) { + actual = append(actual, o) + }, + ) + + require.Equal(t, expected, actual) + }) } From 4df47f54f042355a7b2231a9efc15aa1857d81d5 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 24 Jan 2023 09:37:21 -0500 Subject: [PATCH 2/2] raftlog: introduce EntryEncoding{Standard,Sideloaded}WithAC Part of #95563. Predecessor to #95637. This commit introduces two new encodings for raft log entries, EntryEncoding{Standard,Sideloaded}WithAC. Raft log entries have prefix byte that informs decoding routines how to interpret the subsequent bytes. To date we've had two, EntryEncoding{Standard,Sideloaded}[^1], to indicate whether the entry came with sideloaded data[^2]. Our two additions here will be used to indicate whether the particular entry is subject to replication admission control. If so, right as we persist entries into the raft log storage, we'll "admit the work without blocking", which is further explained in #95637. The decision to use replication admission control happens above raft and a per-entry basis. If using replication admission control, AC-specific metadata will be plumbed down as part of the marshaled raft command. This too is explained in in #95637, specifically, the 'RaftAdmissionMeta' section. When using these encodings in the future, we'll need to tied it to a version gate since we're using a prefix byte for raft commands one that's recognized in earlier CRDB versions. [^1]: Now renamed to EntryEncoding{Standard,Sideloaded}WithoutAC. [^2]: These are typically AddSSTs, the storage for which is treated differently for performance reasons. Release note: None --- pkg/kv/kvserver/logstore/logstore.go | 4 +- .../kvserver/logstore/logstore_bench_test.go | 2 +- pkg/kv/kvserver/logstore/sideload.go | 10 +- pkg/kv/kvserver/logstore/sideload_test.go | 16 +- .../kvserver/loqrecovery/recovery_env_test.go | 2 +- pkg/kv/kvserver/raft.go | 13 +- pkg/kv/kvserver/raftlog/encoding.go | 145 ++++++++++++------ pkg/kv/kvserver/raftlog/entry.go | 39 +++-- pkg/kv/kvserver/raftlog/entry_test.go | 2 +- pkg/kv/kvserver/raftlog/iter_bench_test.go | 18 ++- pkg/kv/kvserver/raftlog/iter_test.go | 6 +- pkg/kv/kvserver/replica_proposal_buf_test.go | 2 +- pkg/kv/kvserver/replica_raft.go | 6 +- pkg/kv/kvserver/replica_raft_quiesce.go | 2 +- pkg/kv/kvserver/replica_sideload_test.go | 2 +- 15 files changed, 166 insertions(+), 103 deletions(-) diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index e53e9c17f756..f1784431b0f5 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -328,7 +328,7 @@ func LoadTerm( if err != nil { return 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */) } return entry.Term, nil @@ -405,7 +405,7 @@ func LoadEntries( if err != nil { return err } - if typ == raftlog.EntryEncodingSideloaded { + if typ.IsSideloaded() { newEnt, err := MaybeInlineSideloadedRaftCommand( ctx, rangeID, ent, sideloaded, eCache, ) diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index e2e31c257ff8..0c3f8e0473e2 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { Term: 1, Index: 1, Type: raftpb.EntryNormal, - Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data), + Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data), }) stats := &AppendStats{} diff --git a/pkg/kv/kvserver/logstore/sideload.go b/pkg/kv/kvserver/logstore/sideload.go index 9b6f60f0e899..2c21a24fae44 100644 --- a/pkg/kv/kvserver/logstore/sideload.go +++ b/pkg/kv/kvserver/logstore/sideload.go @@ -86,7 +86,7 @@ func MaybeSideloadEntries( if err != nil { return nil, 0, 0, 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { otherEntriesSize += int64(len(input[i].Data)) continue } @@ -124,7 +124,7 @@ func MaybeSideloadEntries( // TODO(tbg): this should be supported by a method as well. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command") @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand( if err != nil { return nil, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return nil, nil } log.Event(ctx, "inlining sideloaded SSTable") @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand( // the EntryEncoding. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, err @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) if err != nil { log.Fatalf(ctx, "%v", err) } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return } diff --git a/pkg/kv/kvserver/logstore/sideload_test.go b/pkg/kv/kvserver/logstore/sideload_test.go index 65f98509af02..800ff9b9ce7b 100644 --- a/pkg/kv/kvserver/logstore/sideload_test.go +++ b/pkg/kv/kvserver/logstore/sideload_test.go @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) { } func mkEnt( - v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, + enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, ) raftpb.Entry { cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen) var cmd kvserverpb.RaftCommand @@ -62,7 +62,7 @@ func mkEnt( } var ent raftpb.Entry ent.Index, ent.Term = index, term - ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b) + ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b) return ent } @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte + v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC rangeID := roachpb.RangeID(1) type testCase struct { @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { addSSTStripped := addSST addSSTStripped.Data = nil - entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil) - entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST) - entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil) - entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST) - entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped) + entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil) + entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST) + entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil) + entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST) + entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped) type tc struct { name string diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index 68036eff3932..2bbf90e877ae 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -381,7 +381,7 @@ func raftLogFromPendingDescriptorUpdate( t.Fatalf("failed to serialize raftCommand: %v", err) } data := raftlog.EncodeRaftCommand( - raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) + raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) ent := raftpb.Entry{ Term: 1, Index: replica.RaftCommittedIndex + entryIndex, diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 8ba284ad6e06..6e1b6fdb62a1 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string { } // NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods // that call this take care of unwrapping the ConfChange), and since - // len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and - // they are encoded identically. - cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data) + // len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded} + // and they are encoded identically. + cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data) return fmt.Sprintf("[%x] [%d]", cmdID, len(data)) } @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase continue } switch typ { - case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded: - id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data) + case raftlog.EntryEncodingStandardWithAC, + raftlog.EntryEncodingSideloadedWithAC, + raftlog.EntryEncodingStandardWithoutAC, + raftlog.EntryEncodingSideloadedWithoutAC: + id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data) ids = append(ids, id) case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2: // Configuration changes don't have the CmdIDKey easily accessible but are diff --git a/pkg/kv/kvserver/raftlog/encoding.go b/pkg/kv/kvserver/raftlog/encoding.go index dd52176a0607..bf312d57f233 100644 --- a/pkg/kv/kvserver/raftlog/encoding.go +++ b/pkg/kv/kvserver/raftlog/encoding.go @@ -23,83 +23,128 @@ import ( // and, in some cases, the first byte of the Entry's Data payload. type EntryEncoding byte -// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since -// they aren't on the wire. - const ( - // EntryEncodingStandard is the default encoding for a CockroachDB raft log - // entry. - // - // This is a raftpb.Entry of type EntryNormal whose Data slice is either empty - // or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a - // kvserverpb.RaftCommand. - EntryEncodingStandard EntryEncoding = 0 - // EntryEncodingSideloaded indicates a proposal representing the result of a - // roachpb.AddSSTableRequest for which the payload (the SST) is stored outside - // the storage engine to improve storage performance. - // - // This is a raftpb.Entry of type EntryNormal whose data slice is either empty - // or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a - // kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a - // nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of - // which is an SST to be ingested (and which is present in memory but made - // durable via direct storage on the filesystem, bypassing the storage - // engine). - EntryEncodingSideloaded EntryEncoding = 1 // EntryEncodingEmpty is an empty entry. These are used by raft after // leader election. Since they hold no data, there is nothing in them to // decode. - EntryEncodingEmpty EntryEncoding = 253 + EntryEncodingEmpty EntryEncoding = iota + // EntryEncodingStandardWithAC is the default encoding for a CockroachDB + // raft log entry. + // + // This is a raftpb.Entry of type EntryNormal whose Data slice is either + // empty or whose first byte matches entryEncodingStandardWithACPrefixByte. + // The subsequent eight bytes represent a CmdIDKey. The remaining bytes + // represent a kvserverpb.RaftCommand that also includes data used for + // below-raft admission control (Admission{Priority,CreateTime,OriginNode}). + EntryEncodingStandardWithAC + // EntryEncodingSideloadedWithAC indicates a proposal representing the + // result of a roachpb.AddSSTableRequest for which the payload (the SST) is + // stored outside the storage engine to improve storage performance. + // + // This is a raftpb.Entry of type EntryNormal whose data slice is either + // empty or with first byte == entryEncodingSideloadedWithACPrefixByte. The + // subsequent eight bytes represent a CmdIDKey. The remaining bytes + // represent a kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult + // holds a nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data + // field of which is an SST to be ingested (and which is present in memory + // but made durable via direct storage on the filesystem, bypassing the + // storage engine). Admission{Priority,CreateTime,OriginNode} in the + // kvserverpb.RaftCommand are non-empty, data used for below-raft admission + // control. + EntryEncodingSideloadedWithAC + // EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but + // without the data for below-raft admission control. + EntryEncodingStandardWithoutAC + // EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC + // but without below-raft admission metadata. + EntryEncodingSideloadedWithoutAC // EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is // raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange // whose Context field is a kvserverpb.ConfChangeContext whose Payload is a // kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of // protobuf unmarshaling. - EntryEncodingRaftConfChange EntryEncoding = 254 - // EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with - // the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} - // applied. - EntryEncodingRaftConfChangeV2 EntryEncoding = 255 + EntryEncodingRaftConfChange + // EntryEncodingRaftConfChangeV2 is analogous to + // EntryEncodingRaftConfChange, with the replacements + // raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied. + EntryEncodingRaftConfChangeV2 +) + +// IsSideloaded returns true if the encoding is +// EntryEncodingSideloadedWith{,out}AC. +func (enc EntryEncoding) IsSideloaded() bool { + return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC +} + +// UsesAdmissionControl returns true if the encoding is +// EntryEncoding{Standard,Sideloaded}WithAC. +func (enc EntryEncoding) UsesAdmissionControl() bool { + return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC +} + +// prefixByte returns the prefix byte used during encoding, applicable only to +// EntryEncoding{Standard,Sideloaded}With{,out}AC. +func (enc EntryEncoding) prefixByte() byte { + switch enc { + case EntryEncodingStandardWithAC: + return entryEncodingStandardWithACPrefixByte + case EntryEncodingSideloadedWithAC: + return entryEncodingSideloadedWithACPrefixByte + case EntryEncodingStandardWithoutAC: + return entryEncodingStandardWithoutACPrefixByte + case EntryEncodingSideloadedWithoutAC: + return entryEncodingSideloadedWithoutACPrefixByte + default: + panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc)) + } +} + +const ( + // entryEncodingStandardWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithAC. + entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010 + // entryEncodingSideloadedWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithAC. + entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011 + // entryEncodingStandardWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithoutAC. + entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000 + // entryEncodingSideloadedWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithoutAC. + entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001 ) -// TODO(tbg): when we have a good library for encoding entries, these should -// no longer be exported. const ( // RaftCommandIDLen is the length of a command ID. RaftCommandIDLen = 8 - // RaftCommandPrefixLen is the length of the prefix of raft entries that - // use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The - // bytes after the prefix represent the kvserverpb.RaftCommand. - // + // RaftCommandPrefixLen is the length of the prefix of raft entries that use + // the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes + // after the prefix represent the kvserverpb.RaftCommand. RaftCommandPrefixLen = 1 + RaftCommandIDLen - // EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's - // Data slice for an Entry of encoding EntryEncodingStandard. - EntryEncodingStandardPrefixByte = byte(0) - // EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data - // slice for an Entry of encoding EntryEncodingSideloaded. - EntryEncodingSideloadedPrefixByte = byte(1) ) -// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or -// EntryEncodingSideloaded. -func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte { +// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using +// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte { b := make([]byte, RaftCommandPrefixLen+len(command)) - EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID) + EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID) copy(b[RaftCommandPrefixLen:], command) return b } -// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type -// EntryEncodingStandard or EntryEncodingSideloaded. -func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) { +// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the +// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) { if len(commandID) != RaftCommandIDLen { panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen)) } if len(b) != RaftCommandPrefixLen { panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen)) } - b[0] = prefixByte + b[0] = enc.prefixByte() copy(b[1:], commandID) } diff --git a/pkg/kv/kvserver/raftlog/entry.go b/pkg/kv/kvserver/raftlog/entry.go index d0d04ba2378d..f05d10baa3dd 100644 --- a/pkg/kv/kvserver/raftlog/entry.go +++ b/pkg/kv/kvserver/raftlog/entry.go @@ -36,36 +36,40 @@ func EncodingOf(ent raftpb.Entry) (EntryEncoding, error) { } switch ent.Type { - case raftpb.EntryNormal: case raftpb.EntryConfChange: return EntryEncodingRaftConfChange, nil case raftpb.EntryConfChangeV2: return EntryEncodingRaftConfChangeV2, nil + case raftpb.EntryNormal: default: return 0, errors.AssertionFailedf("unknown EntryType %d", ent.Type) } switch ent.Data[0] { - case EntryEncodingStandardPrefixByte: - return EntryEncodingStandard, nil - case EntryEncodingSideloadedPrefixByte: - return EntryEncodingSideloaded, nil + case entryEncodingStandardWithACPrefixByte: + return EntryEncodingStandardWithAC, nil + case entryEncodingSideloadedWithACPrefixByte: + return EntryEncodingSideloadedWithAC, nil + case entryEncodingStandardWithoutACPrefixByte: + return EntryEncodingStandardWithoutAC, nil + case entryEncodingSideloadedWithoutACPrefixByte: + return EntryEncodingSideloadedWithoutAC, nil default: return 0, errors.AssertionFailedf("unknown command encoding version %d", ent.Data[0]) } } -// DecomposeRaftVersionStandardOrSideloaded extracts the CmdIDKey and the -// marshaled kvserverpb.RaftCommand from a slice which is known to have come -// from a raftpb.Entry of type raftlog.EntryEncodingStandard or -// raftlog.EntryEncodingSideloaded (which, mod the prefix byte, share an -// encoding). -func DecomposeRaftVersionStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { +// DecomposeRaftEncodingStandardOrSideloaded extracts the CmdIDKey and the +// marshaled kvserverpb.RaftCommand from a raftpb.Entry slice known to have +// Entry with type EntryEncoding{Standard,Sideloaded}With{,out}AC. +// All these variants, mod the prefix byte, share an encoding. +func DecomposeRaftEncodingStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { return kvserverbase.CmdIDKey(data[1 : 1+RaftCommandIDLen]), data[1+RaftCommandIDLen:] } // Entry contains data related to a raft log entry. This is the raftpb.Entry -// itself but also all encapsulated data relevant for command application. +// itself but also all encapsulated data relevant for command application and +// admission control. type Entry struct { raftpb.Entry ID kvserverbase.CmdIDKey // may be empty for zero Entry @@ -73,6 +77,10 @@ type Entry struct { ConfChangeV1 *raftpb.ConfChange // only set for config change ConfChangeV2 *raftpb.ConfChangeV2 // only set for config change ConfChangeContext *kvserverpb.ConfChangeContext // only set for config change + // ApplyAdmissionControl determines whether this entry is subject to + // replication admission control. Only applies for entries with encoding + // EntryEncoding{Standard,Sideloaded}WithAC. + ApplyAdmissionControl bool } var entryPool = sync.Pool{ @@ -145,8 +153,11 @@ func (e *Entry) load() error { AsV2() raftpb.ConfChangeV2 } switch typ { - case EntryEncodingStandard, EntryEncodingSideloaded: - e.ID, raftCmdBytes = DecomposeRaftVersionStandardOrSideloaded(e.Entry.Data) + case EntryEncodingStandardWithAC, EntryEncodingSideloadedWithAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) + e.ApplyAdmissionControl = true + case EntryEncodingStandardWithoutAC, EntryEncodingSideloadedWithoutAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) case EntryEncodingEmpty: // Nothing to load, the empty raftpb.Entry is represented by a trivial // Entry. diff --git a/pkg/kv/kvserver/raftlog/entry_test.go b/pkg/kv/kvserver/raftlog/entry_test.go index f7ac225e81c3..6893d4b3541c 100644 --- a/pkg/kv/kvserver/raftlog/entry_test.go +++ b/pkg/kv/kvserver/raftlog/entry_test.go @@ -25,7 +25,7 @@ func TestLoadInvalidEntry(t *testing.T) { Data: EncodeRaftCommand( // It would be nice to have an "even more invalid" command here but it // turns out that DecodeRaftCommand "handles" errors via panic(). - EntryEncodingStandardPrefixByte, "foobarzz", []byte("definitely not a protobuf"), + EntryEncodingStandardWithAC, "foobarzz", []byte("definitely not a protobuf"), ), } ent, err := NewEntry(invalidEnt) diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index 722510919d8f..adc0850b9c37 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -59,10 +59,9 @@ func (m *mockReader) NewMVCCIterator( return m.iter } -func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { +func mkRaftCommand(keySize, valSize, writeBatchSize int) *kvserverpb.RaftCommand { r := rand.New(rand.NewSource(123)) - // A realistic-ish raft command for a ~1kb write. - cmd := &kvserverpb.RaftCommand{ + return &kvserverpb.RaftCommand{ ProposerLeaseSequence: 1, MaxLeaseIndex: 1159192591, ClosedTimestamp: &hlc.Timestamp{WallTime: 12512591925, Logical: 1}, @@ -79,20 +78,25 @@ func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { }, RaftLogDelta: 1300, }, - WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, 2000)}, + WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, writeBatchSize)}, LogicalOpLog: &kvserverpb.LogicalOpLog{Ops: []enginepb.MVCCLogicalOp{ { WriteValue: &enginepb.MVCCWriteValueOp{ - Key: roachpb.Key(randutil.RandBytes(r, 100)), + Key: roachpb.Key(randutil.RandBytes(r, keySize)), Timestamp: hlc.Timestamp{WallTime: 1284581285}, - Value: roachpb.Key(randutil.RandBytes(r, 1800)), + Value: roachpb.Key(randutil.RandBytes(r, valSize)), }, }, }}, } +} + +func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { + // A realistic-ish raft command for a ~1kb write. + cmd := mkRaftCommand(100, 1800, 2000) cmdB, err := protoutil.Marshal(cmd) require.NoError(b, err) - data := EncodeRaftCommand(EntryEncodingStandardPrefixByte, "cmd12345", cmdB) + data := EncodeRaftCommand(EntryEncodingStandardWithoutAC, "cmd12345", cmdB) ent := raftpb.Entry{ Term: 1, diff --git a/pkg/kv/kvserver/raftlog/iter_test.go b/pkg/kv/kvserver/raftlog/iter_test.go index 189c4f965f95..6d372e654885 100644 --- a/pkg/kv/kvserver/raftlog/iter_test.go +++ b/pkg/kv/kvserver/raftlog/iter_test.go @@ -44,11 +44,11 @@ func ents(inds ...uint64) []raftpb.Entry { typ := raftpb.EntryType(ind % 3) switch typ { case raftpb.EntryNormal: - prefixByte := EntryEncodingStandardPrefixByte + enc := EntryEncodingStandardWithAC if ind%2 == 0 { - prefixByte = EntryEncodingSideloadedPrefixByte + enc = EntryEncodingSideloadedWithAC } - data = EncodeRaftCommand(prefixByte, cmdID, b) + data = EncodeRaftCommand(enc, cmdID, b) case raftpb.EntryConfChangeV2: c := kvserverpb.ConfChangeContext{ CommandID: string(cmdID), diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 05be5ad0d3da..5b872d6e2771 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -303,7 +303,7 @@ func (pc proposalCreator) encodeProposal(p *ProposalData) []byte { cmdLen := p.command.Size() needed := raftlog.RaftCommandPrefixLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize() data := make([]byte, raftlog.RaftCommandPrefixLen, needed) - raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardWithoutAC, p.idKey) data = data[:raftlog.RaftCommandPrefixLen+p.command.Size()] if _, err := protoutil.MarshalTo(p.command, data[raftlog.RaftCommandPrefixLen:]); err != nil { panic(err) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ea9ed9da0700..5b11de2dcd98 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -344,7 +344,7 @@ func (r *Replica) propose( // Determine the encoding style for the Raft command. prefix := true - encodingPrefixByte := raftlog.EntryEncodingStandardPrefixByte + entryEncoding := raftlog.EntryEncodingStandardWithoutAC if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil { // EndTxnRequest with a ChangeReplicasTrigger is special because Raft // needs to understand it; it cannot simply be an opaque command. To @@ -416,7 +416,7 @@ func (r *Replica) propose( } } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") - encodingPrefixByte = raftlog.EntryEncodingSideloadedPrefixByte + entryEncoding = raftlog.EntryEncodingSideloadedWithoutAC r.store.metrics.AddSSTableProposals.Inc(1) if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { @@ -438,7 +438,7 @@ func (r *Replica) propose( data := make([]byte, preLen, needed) // Encode prefix with command ID, if necessary. if prefix { - raftlog.EncodeRaftCommandPrefix(data, encodingPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, entryEncoding, p.idKey) } // Encode body of command. data = data[:preLen+cmdLen] diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..147ee7447721 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -87,7 +87,7 @@ func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool { r.store.unquiescedReplicas.Unlock() r.maybeCampaignOnWakeLocked(ctx) // Propose an empty command which will wake the leader. - data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, makeIDKey(), nil) + data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil) _ = r.mu.internalRaftGroup.Propose(data) return true } diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 2c7de2197518..74501c638a78 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -199,7 +199,7 @@ func TestRaftSSTableSideloading(t *testing.T) { var idx int for idx = 0; idx < len(ents); idx++ { // Get the SST back from the raft log. - if typ, _ := raftlog.EncodingOf(ents[idx]); typ != raftlog.EntryEncodingSideloaded { + if typ, _ := raftlog.EncodingOf(ents[idx]); !typ.IsSideloaded() { continue } ent, err := logstore.MaybeInlineSideloadedRaftCommand(ctx, tc.repl.RangeID, ents[idx], tc.repl.raftMu.sideloaded, tc.store.raftEntryCache)