Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftlog: introduce EntryEncoding{Standard,Sideloaded}WithAC #95748

Merged
merged 1 commit into from
Jan 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func LoadTerm(
if err != nil {
return 0, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
eCache.Add(rangeID, []raftpb.Entry{entry}, false /* truncate */)
}
return entry.Term, nil
Expand Down Expand Up @@ -405,7 +405,7 @@ func LoadEntries(
if err != nil {
return err
}
if typ == raftlog.EntryEncodingSideloaded {
if typ.IsSideloaded() {
newEnt, err := MaybeInlineSideloadedRaftCommand(
ctx, rangeID, ent, sideloaded, eCache,
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) {
Term: 1,
Index: 1,
Type: raftpb.EntryNormal,
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardPrefixByte, "deadbeef", data),
Data: raftlog.EncodeRaftCommand(raftlog.EntryEncodingStandardWithoutAC, "deadbeef", data),
})
stats := &AppendStats{}

Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/logstore/sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func MaybeSideloadEntries(
if err != nil {
return nil, 0, 0, 0, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
otherEntriesSize += int64(len(input[i].Data))
continue
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func MaybeSideloadEntries(
// TODO(tbg): this should be supported by a method as well.
{
data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size())
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID)
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID)
_, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
if err != nil {
return nil, 0, 0, 0, errors.Wrap(err, "while marshaling stripped sideloaded command")
Expand Down Expand Up @@ -165,7 +165,7 @@ func MaybeInlineSideloadedRaftCommand(
if err != nil {
return nil, err
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
return nil, nil
}
log.Event(ctx, "inlining sideloaded SSTable")
Expand Down Expand Up @@ -213,7 +213,7 @@ func MaybeInlineSideloadedRaftCommand(
// the EntryEncoding.
{
data := make([]byte, raftlog.RaftCommandPrefixLen+e.Cmd.Size())
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], raftlog.EntryEncodingSideloadedPrefixByte, e.ID)
raftlog.EncodeRaftCommandPrefix(data[:raftlog.RaftCommandPrefixLen], typ, e.ID)
_, err := protoutil.MarshalTo(&e.Cmd, data[raftlog.RaftCommandPrefixLen:])
if err != nil {
return nil, err
Expand All @@ -232,7 +232,7 @@ func AssertSideloadedRaftCommandInlined(ctx context.Context, ent *raftpb.Entry)
if err != nil {
log.Fatalf(ctx, "%v", err)
}
if typ != raftlog.EntryEncodingSideloaded {
if !typ.IsSideloaded() {
return
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/logstore/sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func mustEntryEq(t testing.TB, l, r raftpb.Entry) {
}

func mkEnt(
v byte, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable,
enc raftlog.EntryEncoding, index, term uint64, as *kvserverpb.ReplicatedEvalResult_AddSSTable,
) raftpb.Entry {
cmdIDKey := strings.Repeat("x", raftlog.RaftCommandIDLen)
var cmd kvserverpb.RaftCommand
Expand All @@ -62,7 +62,7 @@ func mkEnt(
}
var ent raftpb.Entry
ent.Index, ent.Term = index, term
ent.Data = raftlog.EncodeRaftCommand(v, kvserverbase.CmdIDKey(cmdIDKey), b)
ent.Data = raftlog.EncodeRaftCommand(enc, kvserverbase.CmdIDKey(cmdIDKey), b)
return ent
}

Expand Down Expand Up @@ -355,7 +355,7 @@ func TestRaftSSTableSideloadingInline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

v1, v2 := raftlog.EntryEncodingStandardPrefixByte, raftlog.EntryEncodingSideloadedPrefixByte
v1, v2 := raftlog.EntryEncodingStandardWithAC, raftlog.EntryEncodingSideloadedWithAC
rangeID := roachpb.RangeID(1)

type testCase struct {
Expand Down Expand Up @@ -477,11 +477,11 @@ func TestRaftSSTableSideloadingSideload(t *testing.T) {
addSSTStripped := addSST
addSSTStripped.Data = nil

entV1Reg := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 10, 99, nil)
entV1SST := mkEnt(raftlog.EntryEncodingStandardPrefixByte, 11, 99, &addSST)
entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 12, 99, nil)
entV2SST := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSST)
entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedPrefixByte, 13, 99, &addSSTStripped)
entV1Reg := mkEnt(raftlog.EntryEncodingStandardWithAC, 10, 99, nil)
entV1SST := mkEnt(raftlog.EntryEncodingStandardWithAC, 11, 99, &addSST)
entV2Reg := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 12, 99, nil)
entV2SST := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSST)
entV2SSTStripped := mkEnt(raftlog.EntryEncodingSideloadedWithAC, 13, 99, &addSSTStripped)

type tc struct {
name string
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/loqrecovery/recovery_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func raftLogFromPendingDescriptorUpdate(
t.Fatalf("failed to serialize raftCommand: %v", err)
}
data := raftlog.EncodeRaftCommand(
raftlog.EntryEncodingStandardPrefixByte, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
raftlog.EntryEncodingStandardWithoutAC, kvserverbase.CmdIDKey(fmt.Sprintf("%08d", entryIndex)), out)
ent := raftpb.Entry{
Term: 1,
Index: replica.RaftCommittedIndex + entryIndex,
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ func raftEntryFormatter(data []byte) string {
}
// NB: a raft.EntryFormatter is only invoked for EntryNormal (raft methods
// that call this take care of unwrapping the ConfChange), and since
// len(data)>0 it has to be EntryEncodingStandard or EntryEncodingSideloaded and
// they are encoded identically.
cmdID, data := raftlog.DecomposeRaftVersionStandardOrSideloaded(data)
// len(data)>0 it has to be {Deprecated,}EntryEncoding{Standard,Sideloaded}
// and they are encoded identically.
cmdID, data := raftlog.DecomposeRaftEncodingStandardOrSideloaded(data)
return fmt.Sprintf("[%x] [%d]", cmdID, len(data))
}

Expand Down Expand Up @@ -274,8 +274,11 @@ func extractIDs(ids []kvserverbase.CmdIDKey, ents []raftpb.Entry) []kvserverbase
continue
}
switch typ {
case raftlog.EntryEncodingStandard, raftlog.EntryEncodingSideloaded:
id, _ := raftlog.DecomposeRaftVersionStandardOrSideloaded(e.Data)
case raftlog.EntryEncodingStandardWithAC,
raftlog.EntryEncodingSideloadedWithAC,
raftlog.EntryEncodingStandardWithoutAC,
raftlog.EntryEncodingSideloadedWithoutAC:
id, _ := raftlog.DecomposeRaftEncodingStandardOrSideloaded(e.Data)
ids = append(ids, id)
case raftlog.EntryEncodingRaftConfChange, raftlog.EntryEncodingRaftConfChangeV2:
// Configuration changes don't have the CmdIDKey easily accessible but are
Expand Down
145 changes: 95 additions & 50 deletions pkg/kv/kvserver/raftlog/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,83 +23,128 @@ import (
// and, in some cases, the first byte of the Entry's Data payload.
type EntryEncoding byte

// TODO(tbg): use auto-assigned consts (iota) for the encodings below, since
// they aren't on the wire.

const (
// EntryEncodingStandard is the default encoding for a CockroachDB raft log
// entry.
//
// This is a raftpb.Entry of type EntryNormal whose Data slice is either empty
// or whose first byte matches EntryEncodingStandardPrefixByte. The subsequent
// eight bytes represent a CmdIDKey. The remaining bytes represent a
// kvserverpb.RaftCommand.
EntryEncodingStandard EntryEncoding = 0
// EntryEncodingSideloaded indicates a proposal representing the result of a
// roachpb.AddSSTableRequest for which the payload (the SST) is stored outside
// the storage engine to improve storage performance.
//
// This is a raftpb.Entry of type EntryNormal whose data slice is either empty
// or whose first byte matches EntryEncodingSideloadedPrefixByte. The subsequent
// eight bytes represent a CmdIDKey. The remaining bytes represent a
// kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult holds a
// nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data field of
// which is an SST to be ingested (and which is present in memory but made
// durable via direct storage on the filesystem, bypassing the storage
// engine).
EntryEncodingSideloaded EntryEncoding = 1
// EntryEncodingEmpty is an empty entry. These are used by raft after
// leader election. Since they hold no data, there is nothing in them to
// decode.
EntryEncodingEmpty EntryEncoding = 253
EntryEncodingEmpty EntryEncoding = iota
// EntryEncodingStandardWithAC is the default encoding for a CockroachDB
// raft log entry.
//
// This is a raftpb.Entry of type EntryNormal whose Data slice is either
// empty or whose first byte matches entryEncodingStandardWithACPrefixByte.
// The subsequent eight bytes represent a CmdIDKey. The remaining bytes
// represent a kvserverpb.RaftCommand that also includes data used for
// below-raft admission control (Admission{Priority,CreateTime,OriginNode}).
EntryEncodingStandardWithAC
// EntryEncodingSideloadedWithAC indicates a proposal representing the
// result of a roachpb.AddSSTableRequest for which the payload (the SST) is
// stored outside the storage engine to improve storage performance.
//
// This is a raftpb.Entry of type EntryNormal whose data slice is either
// empty or with first byte == entryEncodingSideloadedWithACPrefixByte. The
// subsequent eight bytes represent a CmdIDKey. The remaining bytes
// represent a kvserverpb.RaftCommand whose kvserverpb.ReplicatedEvalResult
// holds a nontrival kvserverpb.ReplicatedEvalResult_AddSSTable, the Data
// field of which is an SST to be ingested (and which is present in memory
// but made durable via direct storage on the filesystem, bypassing the
// storage engine). Admission{Priority,CreateTime,OriginNode} in the
// kvserverpb.RaftCommand are non-empty, data used for below-raft admission
// control.
EntryEncodingSideloadedWithAC
// EntryEncodingStandardWithoutAC is like EntryEncodingStandardWithAC but
// without the data for below-raft admission control.
EntryEncodingStandardWithoutAC
// EntryEncodingSideloadedWithoutAC is like EntryEncodingStandardWithoutAC
// but without below-raft admission metadata.
EntryEncodingSideloadedWithoutAC
// EntryEncodingRaftConfChange is a raftpb.Entry whose raftpb.EntryType is
// raftpb.EntryConfChange. The Entry's Data field holds a raftpb.ConfChange
// whose Context field is a kvserverpb.ConfChangeContext whose Payload is a
// kvserverpb.RaftCommand. In particular, the CmdIDKey requires a round of
// protobuf unmarshaling.
EntryEncodingRaftConfChange EntryEncoding = 254
// EntryEncodingRaftConfChangeV2 is analogous to EntryEncodingRaftConfChange, with
// the replacements raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2}
// applied.
EntryEncodingRaftConfChangeV2 EntryEncoding = 255
EntryEncodingRaftConfChange
// EntryEncodingRaftConfChangeV2 is analogous to
// EntryEncodingRaftConfChange, with the replacements
// raftpb.EntryConfChange{,V2} and raftpb.ConfChange{,V2} applied.
EntryEncodingRaftConfChangeV2
)

// IsSideloaded returns true if the encoding is
// EntryEncodingSideloadedWith{,out}AC.
func (enc EntryEncoding) IsSideloaded() bool {
return enc == EntryEncodingSideloadedWithAC || enc == EntryEncodingSideloadedWithoutAC
}

// UsesAdmissionControl returns true if the encoding is
// EntryEncoding{Standard,Sideloaded}WithAC.
func (enc EntryEncoding) UsesAdmissionControl() bool {
return enc == EntryEncodingStandardWithAC || enc == EntryEncodingSideloadedWithAC
}

// prefixByte returns the prefix byte used during encoding, applicable only to
// EntryEncoding{Standard,Sideloaded}With{,out}AC.
func (enc EntryEncoding) prefixByte() byte {
switch enc {
case EntryEncodingStandardWithAC:
return entryEncodingStandardWithACPrefixByte
case EntryEncodingSideloadedWithAC:
return entryEncodingSideloadedWithACPrefixByte
case EntryEncodingStandardWithoutAC:
return entryEncodingStandardWithoutACPrefixByte
case EntryEncodingSideloadedWithoutAC:
return entryEncodingSideloadedWithoutACPrefixByte
default:
panic(fmt.Sprintf("invalid encoding: %v has no prefix byte", enc))
}
}

const (
// entryEncodingStandardWithACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingStandardWithAC.
entryEncodingStandardWithACPrefixByte = byte(2) // 0b00000010
// entryEncodingSideloadedWithACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingSideloadedWithAC.
entryEncodingSideloadedWithACPrefixByte = byte(3) // 0b00000011
// entryEncodingStandardWithoutACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingStandardWithoutAC.
entryEncodingStandardWithoutACPrefixByte = byte(0) // 0b00000000
// entryEncodingSideloadedWithoutACPrefixByte is the first byte of a
// raftpb.Entry's Data slice for an Entry of encoding
// EntryEncodingSideloadedWithoutAC.
entryEncodingSideloadedWithoutACPrefixByte = byte(1) // 0b00000001
)

// TODO(tbg): when we have a good library for encoding entries, these should
// no longer be exported.
const (
// RaftCommandIDLen is the length of a command ID.
RaftCommandIDLen = 8
// RaftCommandPrefixLen is the length of the prefix of raft entries that
// use the EntryEncodingStandard or EntryEncodingSideloaded encodings. The
// bytes after the prefix represent the kvserverpb.RaftCommand.
//
// RaftCommandPrefixLen is the length of the prefix of raft entries that use
// the EntryEncoding{Standard,Sideloaded}With{,out}AC encodings. The bytes
// after the prefix represent the kvserverpb.RaftCommand.
irfansharif marked this conversation as resolved.
Show resolved Hide resolved
RaftCommandPrefixLen = 1 + RaftCommandIDLen
// EntryEncodingStandardPrefixByte is the first byte of a raftpb.Entry's
// Data slice for an Entry of encoding EntryEncodingStandard.
EntryEncodingStandardPrefixByte = byte(0)
// EntryEncodingSideloadedPrefixByte is the first byte of a raftpb.Entry's Data
// slice for an Entry of encoding EntryEncodingSideloaded.
EntryEncodingSideloadedPrefixByte = byte(1)
)

// EncodeRaftCommand encodes a raft command of type EntryEncodingStandard or
// EntryEncodingSideloaded.
func EncodeRaftCommand(prefixByte byte, commandID kvserverbase.CmdIDKey, command []byte) []byte {
// EncodeRaftCommand encodes a marshaled kvserverpb.RaftCommand using
// the given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommand(enc EntryEncoding, commandID kvserverbase.CmdIDKey, command []byte) []byte {
b := make([]byte, RaftCommandPrefixLen+len(command))
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], prefixByte, commandID)
EncodeRaftCommandPrefix(b[:RaftCommandPrefixLen], enc, commandID)
copy(b[RaftCommandPrefixLen:], command)
return b
}

// EncodeRaftCommandPrefix encodes the prefix for a Raft command of type
// EntryEncodingStandard or EntryEncodingSideloaded.
func EncodeRaftCommandPrefix(b []byte, prefixByte byte, commandID kvserverbase.CmdIDKey) {
// EncodeRaftCommandPrefix encodes the prefix for a Raft command, using the
// given encoding (one of EntryEncoding{Standard,Sideloaded}With{,out}AC).
func EncodeRaftCommandPrefix(b []byte, enc EntryEncoding, commandID kvserverbase.CmdIDKey) {
if len(commandID) != RaftCommandIDLen {
panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), RaftCommandIDLen))
}
if len(b) != RaftCommandPrefixLen {
panic(fmt.Sprintf("invalid command prefix length; %d != %d", len(b), RaftCommandPrefixLen))
}
b[0] = prefixByte
b[0] = enc.prefixByte()
copy(b[1:], commandID)
}
Loading