diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index e53e9c17f756..f1784431b0f5 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -328,7 +328,7 @@ func LoadTerm( if err != nil { return 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */) } return entry.Term, nil @@ -405,7 +405,7 @@ func LoadEntries( if err != nil { return err } - if typ == raftlog.EntryEncodingSideloaded { + if typ.IsSideloaded() { newEnt, err := MaybeInlineSideloadedRaftCommand( ctx, rangeID, ent, sideloaded, eCache, ) diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index e2e31c257ff8..0c3f8e0473e2 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { Term: 1, Index: 1, Type: raftpb.EntryNormal, - Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data), + Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data), }) stats := &AppendStats{} diff --git a/pkg/kv/kvserver/logstore/sideload.go b/pkg/kv/kvserver/logstore/sideload.go index 9b6f60f0e899..2c21a24fae44 100644 --- a/pkg/kv/kvserver/logstore/sideload.go +++ b/pkg/kv/kvserver/logstore/sideload.go @@ -86,7 +86,7 @@ func MaybeSideloadEntries( if err != nil { return nil, 0, 0, 0, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { otherEntriesSize += int64(len(input[i].Data)) continue } @@ -124,7 +124,7 @@ func MaybeSideloadEntries( // TODO(tbg): this should be supported by a method as well. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command") @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand( if err != nil { return nil, err } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return nil, nil } log.Event(ctx, "inlining sideloaded SSTable") @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand( // the EntryEncoding. { data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size()) - raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID) + raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID) _, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:]) if err != nil { return nil, err @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry) if err != nil { log.Fatalf(ctx, "%v", err) } - if typ != raftlog.EntryEncodingSideloaded { + if !typ.IsSideloaded() { return } diff --git a/pkg/kv/kvserver/logstore/sideload_test.go b/pkg/kv/kvserver/logstore/sideload_test.go index 65f98509af02..800ff9b9ce7b 100644 --- a/pkg/kv/kvserver/logstore/sideload_test.go +++ b/pkg/kv/kvserver/logstore/sideload_test.go @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) { } func mkEnt( - v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, + enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable, ) raftpb.Entry { cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen) var cmd kvserverpb.RaftCommand @@ -62,7 +62,7 @@ func mkEnt( } var ent raftpb.Entry ent.Index, ent.Term = index, term - ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b) + ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b) return ent } @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte + v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC rangeID := roachpb.RangeID(1) type testCase struct { @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) { addSSTStripped := addSST addSSTStripped.Data = nil - entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil) - entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST) - entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil) - entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST) - entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped) + entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil) + entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST) + entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil) + entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST) + entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped) type tc struct { name string diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index 68036eff3932..2bbf90e877ae 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -381,7 +381,7 @@ func raftLogFromPendingDescriptorUpdate( t.Fatalf("failed to serialize raftCommand: %v", err) } data := raftlog.EncodeRaftCommand( - raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) + raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out) ent := raftpb.Entry{ Term: 1, Index: replica.RaftCommittedIndex + entryIndex, diff --git a/pkg/kv/kvserver/raft.go b/pkg/kv/kvserver/raft.go index 8ba284ad6e06..6e1b6fdb62a1 100644 --- a/pkg/kv/kvserver/raft.go +++ b/pkg/kv/kvserver/raft.go @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string { } // NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods // that call this take care of unwrapping the ConfChange), and since - // len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and - // they are encoded identically. - cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data) + // len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded} + // and they are encoded identically. + cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data) return fmt.Sprintf("[%x] [%d]", cmdID, len(data)) } @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase continue } switch typ { - case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded: - id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data) + case raftlog.EntryEncodingStandardWithAC, + raftlog.EntryEncodingSideloadedWithAC, + raftlog.EntryEncodingStandardWithoutAC, + raftlog.EntryEncodingSideloadedWithoutAC: + id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data) ids = append(ids, id) case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2: // Configuration changes don't have the CmdIDKey easily accessible but are diff --git a/pkg/kv/kvserver/raftlog/encoding.go b/pkg/kv/kvserver/raftlog/encoding.go index dd52176a0607..bf312d57f233 100644 --- a/pkg/kv/kvserver/raftlog/encoding.go +++ b/pkg/kv/kvserver/raftlog/encoding.go @@ -23,83 +23,128 @@ import ( // and, in some cases, the first byte of the Entry's Data payload. type EntryEncoding byte -// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since -// they aren't on the wire. - const ( - // EntryEncodingStandard is the default encoding for a CockroachDB raft log - // entry. - // - // This is a raftpb.Entry of type EntryNormal whose Data slice is either empty - // or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a - // kvserverpb.RaftCommand. - EntryEncodingStandard EntryEncoding = 0 - // EntryEncodingSideloaded indicates a proposal representing the result of a - // roachpb.AddSSTableRequest for which the payload (the SST) is stored outside - // the storage engine to improve storage performance. - // - // This is a raftpb.Entry of type EntryNormal whose data slice is either empty - // or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent - // eight bytes represent a CmdIDKey. The remaining bytes represent a - // kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a - // nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of - // which is an SST to be ingested (and which is present in memory but made - // durable via direct storage on the filesystem, bypassing the storage - // engine). - EntryEncodingSideloaded EntryEncoding = 1 // EntryEncodingEmpty is an empty entry. These are used by raft after // leader election. Since they hold no data, there is nothing in them to // decode. - EntryEncodingEmpty EntryEncoding = 253 + EntryEncodingEmpty EntryEncoding = iota + // EntryEncodingStandardWithAC is the default encoding for a CockroachDB + // raft log entry. + // + // This is a raftpb.Entry of type EntryNormal whose Data slice is either + // empty or whose first byte matches entryEncodingStandardWithACPrefixByte. + // The subsequent eight bytes represent a CmdIDKey. The remaining bytes + // represent a kvserverpb.RaftCommand that also includes data used for + // below-raft admission control (Admission{Priority,CreateTime,OriginNode}). + EntryEncodingStandardWithAC + // EntryEncodingSideloadedWithAC indicates a proposal representing the + // result of a roachpb.AddSSTableRequest for which the payload (the SST) is + // stored outside the storage engine to improve storage performance. + // + // This is a raftpb.Entry of type EntryNormal whose data slice is either + // empty or with first byte == entryEncodingSideloadedWithACPrefixByte. The + // subsequent eight bytes represent a CmdIDKey. The remaining bytes + // represent a kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult + // holds a nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data + // field of which is an SST to be ingested (and which is present in memory + // but made durable via direct storage on the filesystem, bypassing the + // storage engine). Admission{Priority,CreateTime,OriginNode} in the + // kvserverpb.RaftCommand are non-empty, data used for below-raft admission + // control. + EntryEncodingSideloadedWithAC + // EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but + // without the data for below-raft admission control. + EntryEncodingStandardWithoutAC + // EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC + // but without below-raft admission metadata. + EntryEncodingSideloadedWithoutAC // EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is // raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange // whose Context field is a kvserverpb.ConfChangeContext whose Payload is a // kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of // protobuf unmarshaling. - EntryEncodingRaftConfChange EntryEncoding = 254 - // EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with - // the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} - // applied. - EntryEncodingRaftConfChangeV2 EntryEncoding = 255 + EntryEncodingRaftConfChange + // EntryEncodingRaftConfChangeV2 is analogous to + // EntryEncodingRaftConfChange, with the replacements + // raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied. + EntryEncodingRaftConfChangeV2 +) + +// IsSideloaded returns true if the encoding is +// EntryEncodingSideloadedWith{,out}AC. +func (enc EntryEncoding) IsSideloaded() bool { + return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC +} + +// UsesAdmissionControl returns true if the encoding is +// EntryEncoding{Standard,Sideloaded}WithAC. +func (enc EntryEncoding) UsesAdmissionControl() bool { + return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC +} + +// prefixByte returns the prefix byte used during encoding, applicable only to +// EntryEncoding{Standard,Sideloaded}With{,out}AC. +func (enc EntryEncoding) prefixByte() byte { + switch enc { + case EntryEncodingStandardWithAC: + return entryEncodingStandardWithACPrefixByte + case EntryEncodingSideloadedWithAC: + return entryEncodingSideloadedWithACPrefixByte + case EntryEncodingStandardWithoutAC: + return entryEncodingStandardWithoutACPrefixByte + case EntryEncodingSideloadedWithoutAC: + return entryEncodingSideloadedWithoutACPrefixByte + default: + panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc)) + } +} + +const ( + // entryEncodingStandardWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithAC. + entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010 + // entryEncodingSideloadedWithACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithAC. + entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011 + // entryEncodingStandardWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingStandardWithoutAC. + entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000 + // entryEncodingSideloadedWithoutACPrefixByte is the first byte of a + // raftpb.Entry's Data slice for an Entry of encoding + // EntryEncodingSideloadedWithoutAC. + entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001 ) -// TODO(tbg): when we have a good library for encoding entries, these should -// no longer be exported. const ( // RaftCommandIDLen is the length of a command ID. RaftCommandIDLen = 8 - // RaftCommandPrefixLen is the length of the prefix of raft entries that - // use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The - // bytes after the prefix represent the kvserverpb.RaftCommand. - // + // RaftCommandPrefixLen is the length of the prefix of raft entries that use + // the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes + // after the prefix represent the kvserverpb.RaftCommand. RaftCommandPrefixLen = 1 + RaftCommandIDLen - // EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's - // Data slice for an Entry of encoding EntryEncodingStandard. - EntryEncodingStandardPrefixByte = byte(0) - // EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data - // slice for an Entry of encoding EntryEncodingSideloaded. - EntryEncodingSideloadedPrefixByte = byte(1) ) -// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or -// EntryEncodingSideloaded. -func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte { +// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using +// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte { b := make([]byte, RaftCommandPrefixLen+len(command)) - EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID) + EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID) copy(b[RaftCommandPrefixLen:], command) return b } -// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type -// EntryEncodingStandard or EntryEncodingSideloaded. -func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) { +// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the +// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC). +func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) { if len(commandID) != RaftCommandIDLen { panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen)) } if len(b) != RaftCommandPrefixLen { panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen)) } - b[0] = prefixByte + b[0] = enc.prefixByte() copy(b[1:], commandID) } diff --git a/pkg/kv/kvserver/raftlog/entry.go b/pkg/kv/kvserver/raftlog/entry.go index d0d04ba2378d..f05d10baa3dd 100644 --- a/pkg/kv/kvserver/raftlog/entry.go +++ b/pkg/kv/kvserver/raftlog/entry.go @@ -36,36 +36,40 @@ func EncodingOf(ent raftpb.Entry) (EntryEncoding, error) { } switch ent.Type { - case raftpb.EntryNormal: case raftpb.EntryConfChange: return EntryEncodingRaftConfChange, nil case raftpb.EntryConfChangeV2: return EntryEncodingRaftConfChangeV2, nil + case raftpb.EntryNormal: default: return 0, errors.AssertionFailedf("unknown EntryType %d", ent.Type) } switch ent.Data[0] { - case EntryEncodingStandardPrefixByte: - return EntryEncodingStandard, nil - case EntryEncodingSideloadedPrefixByte: - return EntryEncodingSideloaded, nil + case entryEncodingStandardWithACPrefixByte: + return EntryEncodingStandardWithAC, nil + case entryEncodingSideloadedWithACPrefixByte: + return EntryEncodingSideloadedWithAC, nil + case entryEncodingStandardWithoutACPrefixByte: + return EntryEncodingStandardWithoutAC, nil + case entryEncodingSideloadedWithoutACPrefixByte: + return EntryEncodingSideloadedWithoutAC, nil default: return 0, errors.AssertionFailedf("unknown command encoding version %d", ent.Data[0]) } } -// DecomposeRaftVersionStandardOrSideloaded extracts the CmdIDKey and the -// marshaled kvserverpb.RaftCommand from a slice which is known to have come -// from a raftpb.Entry of type raftlog.EntryEncodingStandard or -// raftlog.EntryEncodingSideloaded (which, mod the prefix byte, share an -// encoding). -func DecomposeRaftVersionStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { +// DecomposeRaftEncodingStandardOrSideloaded extracts the CmdIDKey and the +// marshaled kvserverpb.RaftCommand from a raftpb.Entry slice known to have +// Entry with type EntryEncoding{Standard,Sideloaded}With{,out}AC. +// All these variants, mod the prefix byte, share an encoding. +func DecomposeRaftEncodingStandardOrSideloaded(data []byte) (kvserverbase.CmdIDKey, []byte) { return kvserverbase.CmdIDKey(data[1 : 1+RaftCommandIDLen]), data[1+RaftCommandIDLen:] } // Entry contains data related to a raft log entry. This is the raftpb.Entry -// itself but also all encapsulated data relevant for command application. +// itself but also all encapsulated data relevant for command application and +// admission control. type Entry struct { raftpb.Entry ID kvserverbase.CmdIDKey // may be empty for zero Entry @@ -73,6 +77,10 @@ type Entry struct { ConfChangeV1 *raftpb.ConfChange // only set for config change ConfChangeV2 *raftpb.ConfChangeV2 // only set for config change ConfChangeContext *kvserverpb.ConfChangeContext // only set for config change + // ApplyAdmissionControl determines whether this entry is subject to + // replication admission control. Only applies for entries with encoding + // EntryEncoding{Standard,Sideloaded}WithAC. + ApplyAdmissionControl bool } var entryPool = sync.Pool{ @@ -145,8 +153,11 @@ func (e *Entry) load() error { AsV2() raftpb.ConfChangeV2 } switch typ { - case EntryEncodingStandard, EntryEncodingSideloaded: - e.ID, raftCmdBytes = DecomposeRaftVersionStandardOrSideloaded(e.Entry.Data) + case EntryEncodingStandardWithAC, EntryEncodingSideloadedWithAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) + e.ApplyAdmissionControl = true + case EntryEncodingStandardWithoutAC, EntryEncodingSideloadedWithoutAC: + e.ID, raftCmdBytes = DecomposeRaftEncodingStandardOrSideloaded(e.Entry.Data) case EntryEncodingEmpty: // Nothing to load, the empty raftpb.Entry is represented by a trivial // Entry. diff --git a/pkg/kv/kvserver/raftlog/entry_test.go b/pkg/kv/kvserver/raftlog/entry_test.go index f7ac225e81c3..6893d4b3541c 100644 --- a/pkg/kv/kvserver/raftlog/entry_test.go +++ b/pkg/kv/kvserver/raftlog/entry_test.go @@ -25,7 +25,7 @@ func TestLoadInvalidEntry(t *testing.T) { Data: EncodeRaftCommand( // It would be nice to have an "even more invalid" command here but it // turns out that DecodeRaftCommand "handles" errors via panic(). - EntryEncodingStandardPrefixByte, "foobarzz", []byte("definitely not a protobuf"), + EntryEncodingStandardWithAC, "foobarzz", []byte("definitely not a protobuf"), ), } ent, err := NewEntry(invalidEnt) diff --git a/pkg/kv/kvserver/raftlog/iter_bench_test.go b/pkg/kv/kvserver/raftlog/iter_bench_test.go index 722510919d8f..adc0850b9c37 100644 --- a/pkg/kv/kvserver/raftlog/iter_bench_test.go +++ b/pkg/kv/kvserver/raftlog/iter_bench_test.go @@ -59,10 +59,9 @@ func (m *mockReader) NewMVCCIterator( return m.iter } -func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { +func mkRaftCommand(keySize, valSize, writeBatchSize int) *kvserverpb.RaftCommand { r := rand.New(rand.NewSource(123)) - // A realistic-ish raft command for a ~1kb write. - cmd := &kvserverpb.RaftCommand{ + return &kvserverpb.RaftCommand{ ProposerLeaseSequence: 1, MaxLeaseIndex: 1159192591, ClosedTimestamp: &hlc.Timestamp{WallTime: 12512591925, Logical: 1}, @@ -79,20 +78,25 @@ func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { }, RaftLogDelta: 1300, }, - WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, 2000)}, + WriteBatch: &kvserverpb.WriteBatch{Data: randutil.RandBytes(r, writeBatchSize)}, LogicalOpLog: &kvserverpb.LogicalOpLog{Ops: []enginepb.MVCCLogicalOp{ { WriteValue: &enginepb.MVCCWriteValueOp{ - Key: roachpb.Key(randutil.RandBytes(r, 100)), + Key: roachpb.Key(randutil.RandBytes(r, keySize)), Timestamp: hlc.Timestamp{WallTime: 1284581285}, - Value: roachpb.Key(randutil.RandBytes(r, 1800)), + Value: roachpb.Key(randutil.RandBytes(r, valSize)), }, }, }}, } +} + +func mkBenchEnt(b *testing.B) (_ raftpb.Entry, metaB []byte) { + // A realistic-ish raft command for a ~1kb write. + cmd := mkRaftCommand(100, 1800, 2000) cmdB, err := protoutil.Marshal(cmd) require.NoError(b, err) - data := EncodeRaftCommand(EntryEncodingStandardPrefixByte, "cmd12345", cmdB) + data := EncodeRaftCommand(EntryEncodingStandardWithoutAC, "cmd12345", cmdB) ent := raftpb.Entry{ Term: 1, diff --git a/pkg/kv/kvserver/raftlog/iter_test.go b/pkg/kv/kvserver/raftlog/iter_test.go index 189c4f965f95..6d372e654885 100644 --- a/pkg/kv/kvserver/raftlog/iter_test.go +++ b/pkg/kv/kvserver/raftlog/iter_test.go @@ -44,11 +44,11 @@ func ents(inds ...uint64) []raftpb.Entry { typ := raftpb.EntryType(ind % 3) switch typ { case raftpb.EntryNormal: - prefixByte := EntryEncodingStandardPrefixByte + enc := EntryEncodingStandardWithAC if ind%2 == 0 { - prefixByte = EntryEncodingSideloadedPrefixByte + enc = EntryEncodingSideloadedWithAC } - data = EncodeRaftCommand(prefixByte, cmdID, b) + data = EncodeRaftCommand(enc, cmdID, b) case raftpb.EntryConfChangeV2: c := kvserverpb.ConfChangeContext{ CommandID: string(cmdID), diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 05be5ad0d3da..5b872d6e2771 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -303,7 +303,7 @@ func (pc proposalCreator) encodeProposal(p *ProposalData) []byte { cmdLen := p.command.Size() needed := raftlog.RaftCommandPrefixLen + cmdLen + kvserverpb.MaxRaftCommandFooterSize() data := make([]byte, raftlog.RaftCommandPrefixLen, needed) - raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, raftlog.EntryEncodingStandardWithoutAC, p.idKey) data = data[:raftlog.RaftCommandPrefixLen+p.command.Size()] if _, err := protoutil.MarshalTo(p.command, data[raftlog.RaftCommandPrefixLen:]); err != nil { panic(err) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index ea9ed9da0700..5b11de2dcd98 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -344,7 +344,7 @@ func (r *Replica) propose( // Determine the encoding style for the Raft command. prefix := true - encodingPrefixByte := raftlog.EntryEncodingStandardPrefixByte + entryEncoding := raftlog.EntryEncodingStandardWithoutAC if crt := p.command.ReplicatedEvalResult.ChangeReplicas; crt != nil { // EndTxnRequest with a ChangeReplicasTrigger is special because Raft // needs to understand it; it cannot simply be an opaque command. To @@ -416,7 +416,7 @@ func (r *Replica) propose( } } else if p.command.ReplicatedEvalResult.AddSSTable != nil { log.VEvent(p.ctx, 4, "sideloadable proposal detected") - encodingPrefixByte = raftlog.EntryEncodingSideloadedPrefixByte + entryEncoding = raftlog.EntryEncodingSideloadedWithoutAC r.store.metrics.AddSSTableProposals.Inc(1) if p.command.ReplicatedEvalResult.AddSSTable.Data == nil { @@ -438,7 +438,7 @@ func (r *Replica) propose( data := make([]byte, preLen, needed) // Encode prefix with command ID, if necessary. if prefix { - raftlog.EncodeRaftCommandPrefix(data, encodingPrefixByte, p.idKey) + raftlog.EncodeRaftCommandPrefix(data, entryEncoding, p.idKey) } // Encode body of command. data = data[:preLen+cmdLen] diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 1fb80c595813..147ee7447721 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -87,7 +87,7 @@ func (r *Replica) maybeUnquiesceAndWakeLeaderLocked() bool { r.store.unquiescedReplicas.Unlock() r.maybeCampaignOnWakeLocked(ctx) // Propose an empty command which will wake the leader. - data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, makeIDKey(), nil) + data := raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, makeIDKey(), nil) _ = r.mu.internalRaftGroup.Propose(data) return true } diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index 2c7de2197518..74501c638a78 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -199,7 +199,7 @@ func TestRaftSSTableSideloading(t *testing.T) { var idx int for idx = 0; idx < len(ents); idx++ { // Get the SST back from the raft log. - if typ, _ := raftlog.EncodingOf(ents[idx]); typ != raftlog.EntryEncodingSideloaded { + if typ, _ := raftlog.EncodingOf(ents[idx]); !typ.IsSideloaded() { continue } ent, err := logstore.MaybeInlineSideloadedRaftCommand(ctx, tc.repl.RangeID, ents[idx], tc.repl.raftMu.sideloaded, tc.store.raftEntryCache) diff --git a/pkg/sql/sqlstats/insights/registry.go b/pkg/sql/sqlstats/insights/registry.go index bcc0bff965d1..9df960f5caed 100644 --- a/pkg/sql/sqlstats/insights/registry.go +++ b/pkg/sql/sqlstats/insights/registry.go @@ -103,13 +103,29 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac slowStatements.Add(i) } } - if slowStatements.Empty() { + + // So far this is the only case when a transaction is considered slow. + // In the future, we may want to make a detector for transactions if there + // are more cases. + highContention := false + if transaction.Contention != nil { + highContention = transaction.Contention.Seconds() >= LatencyThreshold.Get(&r.causes.st.SV).Seconds() + } + + if slowStatements.Empty() && !highContention { + // We only record an insight if we have slow statements or high txn contention. return } + // Note that we'll record insights for every statement, not just for // the slow ones. insight := makeInsight(sessionID, transaction) + if highContention { + insight.Transaction.Problems = addProblem(insight.Transaction.Problems, Problem_SlowExecution) + insight.Transaction.Causes = addCause(insight.Transaction.Causes, Cause_HighContention) + } + for i, s := range *statements { if slowStatements.Contains(i) { switch s.Status { diff --git a/pkg/sql/sqlstats/insights/registry_test.go b/pkg/sql/sqlstats/insights/registry_test.go index e11519c7005b..573888699024 100644 --- a/pkg/sql/sqlstats/insights/registry_test.go +++ b/pkg/sql/sqlstats/insights/registry_test.go @@ -254,4 +254,46 @@ func TestRegistry(t *testing.T) { registry := newRegistry(st, &latencyThresholdDetector{st: st}, newStore(st)) require.NotPanics(t, func() { registry.ObserveTransaction(session.ID, transaction) }) }) + + t.Run("txn with high accumulated contention without high single stmt contention", func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + store := newStore(st) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + contentionDuration := 10 * time.Second + statement := &Statement{ + Status: Statement_Completed, + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), + FingerprintID: roachpb.StmtFingerprintID(100), + LatencyInSeconds: 0.00001, + } + txnHighContention := &Transaction{ID: uuid.FastMakeV4(), Contention: &contentionDuration} + + registry.ObserveStatement(session.ID, statement) + registry.ObserveTransaction(session.ID, txnHighContention) + + expected := []*Insight{ + { + Session: session, + Transaction: &Transaction{ + ID: txnHighContention.ID, + Contention: &contentionDuration, + StmtExecutionIDs: txnHighContention.StmtExecutionIDs, + Problems: []Problem{Problem_SlowExecution}, + Causes: []Cause{Cause_HighContention}}, + Statements: []*Statement{ + newStmtWithProblemAndCauses(statement, Problem_None, nil), + }, + }, + } + + var actual []*Insight + store.IterateInsights( + context.Background(), + func(ctx context.Context, o *Insight) { + actual = append(actual, o) + }, + ) + + require.Equal(t, expected, actual) + }) }