Skip to content

Commit

Permalink
storage: absorb RaftCommand into ReplicatedProposalData
Browse files Browse the repository at this point in the history
Remove the `storagebase.RaftCommand` proto by moving its fields onto
`ReplicatedProposalData` while preserving the tag numbers. Use that message
instead of `RaftCommand` throughout, including on the wire (since the tag
numbers were preserved, this does not require any special handling).

This in preparation for a follow-up change which adds an experimental switch to
use proposer-evaluated KV (#6166).
  • Loading branch information
tbg committed Oct 31, 2016
1 parent f4a8abc commit 11825cc
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 422 deletions.
6 changes: 3 additions & 3 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,12 @@ func tryRaftLogEntry(kv engine.MVCCKeyValue) (string, error) {
if ent.Type == raftpb.EntryNormal {
if len(ent.Data) > 0 {
_, cmdData := storage.DecodeRaftCommand(ent.Data)
var cmd storagebase.RaftCommand
var cmd storagebase.ReplicatedProposalData
if err := cmd.Unmarshal(cmdData); err != nil {
return "", err
}
ent.Data = nil
return fmt.Sprintf("%s by %v\n%s\n%s\n", &ent, cmd.OriginReplica, &cmd.Cmd, &cmd), nil
return fmt.Sprintf("%s by %v\n%s\n%s\n", &ent, cmd.OriginReplica, cmd.Cmd, &cmd), nil
}
return fmt.Sprintf("%s: EMPTY\n", &ent), nil
} else if ent.Type == raftpb.EntryConfChange {
Expand All @@ -441,7 +441,7 @@ func tryRaftLogEntry(kv engine.MVCCKeyValue) (string, error) {
if err := ctx.Unmarshal(cc.Context); err != nil {
return "", err
}
var cmd storagebase.RaftCommand
var cmd storagebase.ReplicatedProposalData
if err := cmd.Unmarshal(ctx.Payload); err != nil {
return "", err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/roachpb/string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func TestTransactionString(t *testing.T) {
var txnEmpty roachpb.Transaction
_ = txnEmpty.String() // prevent regression of NPE

var cmd storagebase.RaftCommand
cmd := storagebase.ReplicatedProposalData{
Cmd: &roachpb.BatchRequest{},
}
cmd.Cmd.Txn = &txn
if actStr, idStr := fmt.Sprintf("%s", &cmd), txn.ID.String(); !strings.Contains(actStr, idStr) {
t.Fatalf("expected to find '%s' in '%s'", idStr, actStr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func raftEntryFormatter(data []byte) string {
// large snapshot entries.
return fmt.Sprintf("[%x] [%d]", commandID, len(data))
}
var cmd storagebase.RaftCommand
var cmd storagebase.ReplicatedProposalData
if err := proto.Unmarshal(encodedCmd, &cmd); err != nil {
return fmt.Sprintf("[error parsing entry: %s]", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/raft.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/storage/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ message ConfChangeContext {
(gogoproto.customname) = "CommandID"];

// Payload is the application-level command (i.e. an encoded
// storagebase.RaftCommand).
// storagebase.ReplicatedProposalData).
optional bytes payload = 2;

// Replica contains full details about the replica being added or removed.
Expand Down
48 changes: 29 additions & 19 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1718,10 +1718,10 @@ func (r *Replica) evaluateProposal(
// since evaluating a proposal is expensive (at least under proposer-
// evaluated KV).
var pd ProposalData
pd.RaftCommand = &storagebase.RaftCommand{
pd.ReplicatedProposalData = storagebase.ReplicatedProposalData{
RangeID: r.RangeID,
OriginReplica: replica,
Cmd: ba,
Cmd: &ba,
}
pd.ctx = ctx
pd.idKey = idKey
Expand All @@ -1737,13 +1737,13 @@ func (r *Replica) insertProposalLocked(pd *ProposalData) {
if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex {
r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex
}
if !pd.RaftCommand.Cmd.IsLeaseRequest() {
if !pd.Cmd.IsLeaseRequest() {
r.mu.lastAssignedLeaseIndex++
}
pd.RaftCommand.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex
pd.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex
if log.V(4) {
log.Infof(pd.ctx, "submitting proposal %x: maxLeaseIndex=%d",
pd.idKey, pd.RaftCommand.MaxLeaseIndex)
pd.idKey, pd.MaxLeaseIndex)
}

if _, ok := r.mu.proposals[pd.idKey]; ok {
Expand Down Expand Up @@ -1838,26 +1838,26 @@ func (r *Replica) isSoloReplicaLocked() bool {
}

func defaultSubmitProposalLocked(r *Replica, p *ProposalData) error {
if p.RaftCommand.Cmd.Timestamp == hlc.ZeroTimestamp {
if p.Cmd.Timestamp == hlc.ZeroTimestamp {
return errors.Errorf("can't propose Raft command with zero timestamp")
}

ctx := r.AnnotateCtx(context.TODO())

data, err := protoutil.Marshal(p.RaftCommand)
data, err := protoutil.Marshal(&p.ReplicatedProposalData)
if err != nil {
return err
}
defer r.store.enqueueRaftUpdateCheck(r.RangeID)

if union, ok := p.RaftCommand.Cmd.GetArg(roachpb.EndTransaction); ok {
if union, ok := p.Cmd.GetArg(roachpb.EndTransaction); ok {
ict := union.(*roachpb.EndTransactionRequest).InternalCommitTrigger
if crt := ict.GetChangeReplicasTrigger(); crt != nil {
// EndTransactionRequest with a ChangeReplicasTrigger is special
// because raft needs to understand it; it cannot simply be an
// opaque command.
log.Infof(ctx, "proposing %s %+v for range %d: %+v",
crt.ChangeType, crt.Replica, p.RaftCommand.RangeID, crt.UpdatedReplicas)
crt.ChangeType, crt.Replica, p.RangeID, crt.UpdatedReplicas)

confChangeCtx := ConfChangeContext{
CommandID: string(p.idKey),
Expand Down Expand Up @@ -2116,7 +2116,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
case raftpb.EntryNormal:

var commandID storagebase.CmdIDKey
var command storagebase.RaftCommand
var command storagebase.ReplicatedProposalData

// Process committed entries. etcd raft occasionally adds a nil entry
// (our own commands are never empty). This happens in two situations:
Expand Down Expand Up @@ -2154,7 +2154,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error {
if err := ccCtx.Unmarshal(cc.Context); err != nil {
return err
}
var command storagebase.RaftCommand
var command storagebase.ReplicatedProposalData
if err := command.Unmarshal(ccCtx.Payload); err != nil {
return err
}
Expand Down Expand Up @@ -2431,7 +2431,7 @@ type pendingCmdSlice []*ProposalData
func (s pendingCmdSlice) Len() int { return len(s) }
func (s pendingCmdSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pendingCmdSlice) Less(i, j int) bool {
return s[i].RaftCommand.MaxLeaseIndex < s[j].RaftCommand.MaxLeaseIndex
return s[i].MaxLeaseIndex < s[j].MaxLeaseIndex
}

//go:generate stringer -type refreshRaftReason
Expand Down Expand Up @@ -2465,7 +2465,7 @@ func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftR
numShouldRetry := 0
var reproposals pendingCmdSlice
for idKey, p := range r.mu.proposals {
if p.RaftCommand.MaxLeaseIndex > maxMustRetryCommandIndex {
if p.MaxLeaseIndex > maxMustRetryCommandIndex {
if p.proposedAtTicks > refreshAtTicks {
// The command was proposed too recently, don't bother reproprosing
// it yet. Note that if refreshAtDelta is 0, refreshAtTicks will be
Expand Down Expand Up @@ -2716,7 +2716,10 @@ func (r *Replica) reportSnapshotStatus(to uint64, snapErr error) {
// which will apply as a no-op (without accessing raftCmd, via an error),
// updating only the applied index.
func (r *Replica) processRaftCommand(
ctx context.Context, idKey storagebase.CmdIDKey, index uint64, raftCmd storagebase.RaftCommand,
ctx context.Context,
idKey storagebase.CmdIDKey,
index uint64,
raftCmd storagebase.ReplicatedProposalData,
) (pErr *roachpb.Error) {
if index == 0 {
log.Fatalf(ctx, "processRaftCommand requires a non-zero index")
Expand Down Expand Up @@ -2827,10 +2830,10 @@ func (r *Replica) processRaftCommand(
}
r.mu.Unlock()

if splitMergeUnlock := r.maybeAcquireSplitMergeLock(raftCmd.Cmd); splitMergeUnlock != nil {
defer func() {
splitMergeUnlock(pErr)
}()
// TODO(tschottdorf): not all commands have a BatchRequest (for example,
// empty appends). Be more careful with this in proposer-eval'ed KV.
if raftCmd.Cmd == nil {
raftCmd.Cmd = &roachpb.BatchRequest{}
}

// applyRaftCommand will return "expected" errors, but may also indicate
Expand All @@ -2840,10 +2843,17 @@ func (r *Replica) processRaftCommand(
log.VEventf(ctx, 1, "applying command with forced error: %s", forcedErr)
} else {
log.Event(ctx, "applying command")

if splitMergeUnlock := r.maybeAcquireSplitMergeLock(*raftCmd.Cmd); splitMergeUnlock != nil {
defer func() {
splitMergeUnlock(pErr)
}()
}

}
var response proposalResult
{
pd := r.applyRaftCommand(ctx, idKey, index, leaseIndex, raftCmd.Cmd, forcedErr)
pd := r.applyRaftCommand(ctx, idKey, index, leaseIndex, *raftCmd.Cmd, forcedErr)
pd.Err = r.maybeSetCorrupt(ctx, pd.Err)

// TODO(tschottdorf): this field should be zeroed earlier.
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,8 @@ const (
raftCommandNoSplitMask = raftCommandNoSplitBit - 1
)

// encode a command ID, an encoded storagebase.RaftCommand, and whether the
// command contains a split.
// encode a command ID, an encoded storagebase.ReplicatedProposalData, and
// whether the command contains a split.
func encodeRaftCommand(commandID storagebase.CmdIDKey, command []byte) []byte {
if len(commandID) != raftCommandIDLen {
panic(fmt.Sprintf("invalid command ID length; %d != %d", len(commandID), raftCommandIDLen))
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ func TestLeaseConcurrent(t *testing.T) {
var seen int32
tc.rng.mu.Lock()
tc.rng.mu.submitProposalFn = func(cmd *ProposalData) error {
ll, ok := cmd.RaftCommand.Cmd.Requests[0].
ll, ok := cmd.Cmd.Requests[0].
GetInner().(*roachpb.RequestLeaseRequest)
if !ok || !active.Load().(bool) {
return defaultSubmitProposalLocked(tc.rng, cmd)
Expand Down Expand Up @@ -5820,7 +5820,7 @@ func TestReplicaIDChangePending(t *testing.T) {
commandProposed := make(chan struct{}, 1)
rng.mu.Lock()
rng.mu.submitProposalFn = func(p *ProposalData) error {
if p.RaftCommand.Cmd.Timestamp.Equal(magicTS) {
if p.Cmd.Timestamp.Equal(magicTS) {
commandProposed <- struct{}{}
}
return nil
Expand Down Expand Up @@ -5852,7 +5852,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) {
tc.rng.mu.submitProposalFn = func(cmd *ProposalData) error {
if v := cmd.ctx.Value(magicKey{}); v != nil {
if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 {
cmd.RaftCommand.MaxLeaseIndex = wrongLeaseIndex
cmd.MaxLeaseIndex = wrongLeaseIndex
}
}
return defaultSubmitProposalLocked(tc.rng, cmd)
Expand Down Expand Up @@ -5991,7 +5991,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
tc.rng.mu.Lock()
tc.rng.mu.submitProposalFn = func(cmd *ProposalData) error {
if v := cmd.ctx.Value(magicKey{}); v != nil {
seenCmds = append(seenCmds, int(cmd.RaftCommand.MaxLeaseIndex))
seenCmds = append(seenCmds, int(cmd.MaxLeaseIndex))
}
return defaultSubmitProposalLocked(tc.rng, cmd)
}
Expand Down Expand Up @@ -6026,7 +6026,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) {
tc.rng.mu.Lock()
for _, p := range tc.rng.mu.proposals {
if v := p.ctx.Value(magicKey{}); v != nil {
origIndexes = append(origIndexes, int(p.RaftCommand.MaxLeaseIndex))
origIndexes = append(origIndexes, int(p.MaxLeaseIndex))
}
}
tc.rng.mu.Unlock()
Expand Down
Loading

0 comments on commit 11825cc

Please sign in to comment.