Skip to content

Commit

Permalink
kvserver: improve reproposal assertions and documentation
Browse files Browse the repository at this point in the history
Reproposals are a deep rabbit hole and an area in which past changes
were all related to subtle bugs. Write it all up and in particular make
some simplifications that ought to be possible if my understanding is
correct:

- have proposals always enter `(*Replica).propose` without a
  MaxLeaseIndex or prior encoded command set, i.e. `propose`
  behaves the same for reproposals as for first proposals.
- assert that after a failed call to tryReproposeWithNewLeaseIndex,
  the command is not in the proposals map, i.e. check absence of
  a leak.
- replace code that should be impossible to reach (and had me confused
  for a good amount of time) with an assertion.
- add long comment on `r.mu.proposals`.

This commit also moves `tryReproposeWithNewLeaseIndex` off `(*Replica)`,
which is possible due to recent changes[^1]. In doing so, I realized
there was a (small) data race (now fixed): when returning a
`NotLeaseholderError` from that method, we weren't acquiring `r.mu`. It
may have looked as though we were holding it already since we're
accessing `r.mu.propBuf`, however that field has special semantics - it
wraps `r.mu` and acquires it when needed.

[^1]: The "below raft" test mentioned in the previous comment was
changed in cockroachdb#93785 and
no longer causes a false positive.

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Jan 5, 2023
1 parent d123302 commit 1d4acc7
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 66 deletions.
129 changes: 123 additions & 6 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ type Replica struct {
// Instead, the buffer internally holds a reference to mu and will use
// it appropriately.
proposalBuf propBuf

// proposals stores the Raft in-flight commands which originated at
// this Replica, i.e. all commands for which propose has been called,
// but which have not yet applied.
Expand All @@ -481,12 +482,128 @@ type Replica struct {
// underneath raft. See comments on ProposalData fields for synchronization
// requirements.
//
// Due to Raft reproposals, multiple in-flight Raft entries can have
// the same CmdIDKey, all corresponding to the same KV request. However,
// not all Raft entries with a given command ID will correspond directly
// to the *RaftCommand contained in its associated *ProposalData. This
// is because the *RaftCommand can be mutated during reproposals by
// Replica.tryReproposeWithNewLeaseIndex.
// Due to Raft reproposals, multiple in-flight Raft entries can have the
// same CmdIDKey. There are two kinds of reproposals:
//
// (1) the exact same entry is handed to raft (possibly despite already being
// present in the log), usually after a timeout[^1].
//
// (2) an existing proposal is updated with a new MaxLeaseIndex and handed to
// raft, i.e. we're intentionally creating a duplicate. This exists because
// for pipelined proposals, the client's goroutine returns without waiting
// for the proposal to apply.[^2][^3] When (2) is carried out, the existing
// copies of the proposal in the log will be "Superseded", see below.
//
// To understand reproposals, we need a broad overview of entry application,
// which is batched (i.e. may process multiple log entries to be applied in
// a batched fashion). In entry application, the following steps are taken:
//
// 1. retrieve all local proposals: iterate through the entries in order,
// and look them up in the proposals map. For each "local" entry (i.e.
// tracked in the map), remove it from the map (unless the proposal
// is not superseded, see below) and attach the value to the entry.
// 2. for each entry:
// - stage written and in-memory effects of the entry (some may apply as no-ops
// if they fail below-raft checks such as the MaxLeaseIndex check)
// - Assuming the MaxLeaseIndex is violated and additional constraints are
// satisfied, carry out (2) from above. On success, we know now that there
// will be a reproposal in the log that can successfully apply. We unbind
// the local proposal (so we don't signal it) and apply the current entry
// as a no-op.
// 3. carry out additional side effects of the entire batch (stats updates etc).
//
// A prerequisite for (2) is that there currently aren't any copies of the proposal
// in the log that may ultimately apply, or we risk doubly applying commands - a
// correctness bug. After (2), any copies of the entry present in the log will have
// a MaxLeaseIndex strictly less than that of the in-memory command, and will be
// Superseded() by it.
//
// We can always safely create an identical copy (i.e. (1)) because of the
// replay protection conferred by the MaxLeaseIndex - all but the first
// proposal will be rejected (i.e. apply as a no-op).
//
// However, the combination of (1) and (2) is problematic because it
// complicates the determination of when (2) is safe. Without (1)[^4], in (2) we
// could "simply" go ahead and do the reproposal during application of the
// entry, since we now know that there is only ever one unapplied copy of the
// entry in the log (since we're creating one only as we consume one). With (1),
// we have to assume there are many copies ahead of us in the log, and possibly
// with various different MaxLeaseIndex values, in effect meaning that the
// MaxLeaseIndex of the in-memory proposal must be compared against that of
// the entry currently being applied - only if they match is the current entry
// the most recent copy, or, in other words, only if the current entry isn't
// superseded as indicated by the in-memory proposal's MaxLeaseIndex.
//
// An example follows. Consider the following situation (where N is some base
// index not relevant to the example) in which we have one inflight proposal which
// has been triplicated in the log (due to [^1]):
//
// proposals[id] = p{Cmd{MaxLeaseIndex: 100, ...}}
//
// ... (unrelated entries)
// raftlog[N] = Cmd{MaxLeaseIndex: 100, ...}
// ... (unrelated entries)
// raftlog[N+12] = (same as N)
// ... (unrelated entries)
// raftlog[N+15] = (same as N)
//
// where we assume that the `MaxLeaseIndex` 100 is invalid, i.e. when we see the
// first copy of the command being applied, we've already applied some command
// with a higher `MaxLeaseIndex`. In a world without mechanism (2), `N` would
// be rejected, and would finalize the proposal (i.e. signal the client with
// an error and remove the entry from `proposals`). Later, `N+12` and `N+15`
// would similarly be rejected (but they wouldn't even be regarded as local
// proposals any more due to not being present in `proposals`).
//
// However, (2) exists and it will engage during application of `N`: realizing
// that the current copies of the entry are all going to be rejected, it will
// modify the proposal by assigning a new `MaxLeaseIndex` to it, and handing
// it to `(*Replica).propose` again (which hands it to the proposal buffer,
// which will at some point flush it, leading to re-insertion into the raft
// log and the `proposals` map). The result will be this picture:
//
// proposals[id] = p{Cmd{MaxLeaseIndex: 192, ...}} <-- modified
//
// ... (unrelated entries)
// raftlog[N] = Cmd{MaxLeaseIndex: 100, ...} <-- applied (as no-op)
// ... (unrelated entries)
// raftlog[N+12] = (same as N) (superseded)
// ... (unrelated entries)
// raftlog[N+15] = (same as N) (superseded)
// ... (unrelated entries)
// raftlog[N+18] = Cmd{MaxLeaseIndex: 192, ...} <-- modified
//
// `N+18` might (in fact, is likely to) apply successfully. As a result, when
// we consider `N+12` or `N+15` for application, we must *not* carry out (2)
// again, or we break replay protection. In other words, the `MaxLeaseIndex`
// of the command being applied must be compared with the `MaxLeaseIndex` of
// the command in the proposals map; only if they match do we know that this
// is the most recent (in MaxLeaseIndex order) copy of the command, and only
// then can (2) engage. In addition, an entry that doesn't pass this equality
// check must not signal the proposer and/or unlink from the proposals map (as a
// newer reproposal is likely in the log)[^4].
//
// Another way of framing the above is that `proposals[id].Cmd.MaxLeaseIndex`
// actually tracks the maximum `MaxLeaseIndex` of all copies that may be present in
// the log.
//
// If (2) results in an error (for example, since the proposal now fails to
// respect the closed timestamp), that error will finalize the proposal and
// is returned to the client.
//
// [^1]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_raft.go#L1224
// [^2]: https://github.com/cockroachdb/cockroach/blob/59ce13b6052a99a0318e3dfe017908ff5630db30/pkg/kv/kvserver/replica_application_result.go#L148
// [^3]: it's debatable how useful this is. It was introduced in
// https://github.com/cockroachdb/cockroach/pull/35261, and perhaps could be
// phased out again if we also did
// https://github.com/cockroachdb/cockroach/issues/21849. Historical
// evidence points to https://github.com/cockroachdb/cockroach/issues/28876
// as the motivation for introducing this mechanism, i.e. it was about
// reducing failure rates early in the life of a cluster when raft
// leaderships were being determined. Perhaps we could "simply" disable
// async writes unless leadership was stable instead, by blocking on the
// proposal anyway.
// [^4]: https://github.com/cockroachdb/cockroach/blob/ab6a8650621ae798377f12bbfc1eee2fbec95480/pkg/kv/kvserver/replica_application_decoder.go#L100-L114
proposals map[kvserverbase.CmdIDKey]*ProposalData
// Indicates that the replica is in the process of applying log entries.
// Updated to true in handleRaftReady before entries are removed from
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
// criterion. While such proposals can be reproposed, only the first
// instance that gets applied matters and so removing the command is
// always what we want to happen.
cmd.Cmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
!cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex)

if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
Expand Down
125 changes: 97 additions & 28 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,34 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
// new one. This is important for pipelined writes, since they don't
// have a client watching to retry, so a failure to eventually apply
// the proposal would be a user-visible error.
pErr = r.tryReproposeWithNewLeaseIndex(ctx, cmd)
pErr = tryReproposeWithNewLeaseIndex(ctx, cmd, (*replicaReproposer)(r))

if pErr != nil {
// An error from tryReproposeWithNewLeaseIndex implies that the current
// entry is not superseded (i.e. we don't have a reproposal at a higher
// MaxLeaseIndex in the log).
//
// This implies that any additional copies of the command (which may be present
// in the log ahead of the current entry) will also fail.
//
// It is thus safe to signal the error back to the client, which is also
// the only sensible choice at this point.
//
// We also know that the proposal is not in the proposals map, since the
// command is local and wasn't superseded, which is the condition in
// retrieveLocalProposals for removing from the map. So we're not leaking
// a map entry here, which we assert against below (and which has coverage,
// at time of writing, through TestReplicaReproposalWithNewLeaseIndexError).
log.Warningf(ctx, "failed to repropose with new lease index: %s", pErr)
cmd.response.Err = pErr

r.mu.RLock()
_, inMap := r.mu.proposals[cmd.ID]
r.mu.RUnlock()

if inMap {
log.Fatalf(ctx, "failed reproposal unexpectedly in proposals map: %+v", cmd)
}
} else {
// Unbind the entry's local proposal because we just succeeded
// in reproposing it and we don't want to acknowledge the client
Expand All @@ -135,36 +159,77 @@ func (r *Replica) prepareLocalResult(ctx context.Context, cmd *replicatedCmd) {
}
}

// reproposer is used by tryReproposeWithNewLeaseIndex.
type reproposer interface {
trackEvaluatingRequest(context.Context, hlc.Timestamp) (hlc.Timestamp, TrackedRequestToken)
propose(context.Context, *ProposalData, TrackedRequestToken) *roachpb.Error
newNotLeaseHolderError(string) *roachpb.NotLeaseHolderError
}

type replicaReproposer Replica

var _ reproposer = (*replicaReproposer)(nil)

func (r *replicaReproposer) trackEvaluatingRequest(
ctx context.Context, wts hlc.Timestamp,
) (hlc.Timestamp, TrackedRequestToken) {
// NB: must not hold r.mu here, the propBuf acquires it itself.
return r.mu.proposalBuf.TrackEvaluatingRequest(ctx, wts)
}

func (r *replicaReproposer) propose(
ctx context.Context, p *ProposalData, tok TrackedRequestToken,
) *roachpb.Error {
return (*Replica)(r).propose(ctx, p, tok)
}

func (r *replicaReproposer) newNotLeaseHolderError(msg string) *roachpb.NotLeaseHolderError {
r.mu.RLock()
defer r.mu.RUnlock()
return roachpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
msg,
)
}

// tryReproposeWithNewLeaseIndex is used by prepareLocalResult to repropose
// commands that have gotten an illegal lease index error, and that we know
// could not have applied while their lease index was valid (that is, we
// observed all applied entries between proposal and the lease index becoming
// invalid, as opposed to skipping some of them by applying a snapshot).
//
// It is not intended for use elsewhere and is only a top-level function so that
// it can avoid the below_raft_protos check. Returns a nil error if the command
// has already been successfully applied or has been reproposed here or by a
// different entry for the same proposal that hit an illegal lease index error.
func (r *Replica) tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd,
// Returns a nil error if the command has already been successfully applied or
// has been reproposed here or by a different entry for the same proposal that
// hit an illegal lease index error.
//
// If this returns a nil error once, it will return a nil error for future calls
// as well, assuming that trackEvaluatingRequest returns monotonically increasing
// timestamps across subsequent calls.
func tryReproposeWithNewLeaseIndex(
ctx context.Context, cmd *replicatedCmd, r reproposer,
) *roachpb.Error {
// Note that we don't need to validate anything about the proposal's
// lease here - if we got this far, we know that everything but the
// index is valid at this point in the log.
p := cmd.proposal
if p.applied || cmd.Cmd.MaxLeaseIndex != p.command.MaxLeaseIndex {
// If the command associated with this rejected raft entry already
// applied then we don't want to repropose it. Doing so could lead
// to duplicate application of the same proposal.
if p.applied || p.Supersedes(cmd.Cmd.MaxLeaseIndex) {
// If the command associated with this rejected raft entry already applied
// then we don't want to repropose it. Doing so could lead to duplicate
// application of the same proposal. (We can see hit this case if an application
// batch contains multiple copies of the same proposal, in which case they are
// all marked as local, the first one will apply (and set p.applied) and the
// remaining copies will hit this branch).
//
// Similarly, if the command associated with this rejected raft
// entry has a different (larger) MaxLeaseIndex than the one we
// decoded from the entry itself, the command must have already
// been reproposed (this can happen if there are multiple copies
// of the command in the logs; see TestReplicaRefreshMultiple).
// We must not create multiple copies with multiple lease indexes,
// so don't repropose it again. This ensures that at any time,
// there is only up to a single lease index that has a chance of
// Similarly, if the proposal associated with this rejected raft entry is
// superseded by a different (larger) MaxLeaseIndex than the one we decoded
// from the entry itself, the command must have already passed through
// tryReproposeWithNewLeaseIndex previously (this can happen if there are
// multiple copies of the command in the logs; see
// TestReplicaRefreshMultiple). We must not create multiple copies with
// multiple lease indexes, so don't repropose it again. This ensures that at
// any time, there is only up to a single lease index that has a chance of
// succeeding in the Raft log for a given command.
return nil
}
Expand All @@ -173,27 +238,31 @@ func (r *Replica) tryReproposeWithNewLeaseIndex(
// it gets reproposed.
// TODO(andrei): Only track if the request consults the ts cache. Some
// requests (e.g. EndTxn) don't care about closed timestamps.
minTS, tok := r.mu.proposalBuf.TrackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
minTS, tok := r.trackEvaluatingRequest(ctx, p.Request.WriteTimestamp())
defer tok.DoneIfNotMoved(ctx)

// NB: p.Request.Timestamp reflects the action of ba.SetActiveTimestamp.
if p.Request.AppliesTimestampCache() && p.Request.WriteTimestamp().LessEq(minTS) {
// The tracker wants us to forward the request timestamp, but we can't
// do that without re-evaluating, so give up. The error returned here
// will go to back to DistSender, so send something it can digest.
err := roachpb.NewNotLeaseHolderError(
*r.mu.state.Lease,
r.store.StoreID(),
r.mu.state.Desc,
"reproposal failed due to closed timestamp",
)
return roachpb.NewError(err)
// will go back to DistSender, so send something it can digest.
return roachpb.NewError(r.newNotLeaseHolderError("reproposal failed due to closed timestamp"))
}
// Some tests check for this log message in the trace.
log.VEventf(ctx, 2, "retry: proposalIllegalLeaseIndex")

// Reset the command for reproposal.
prevMaxLeaseIndex := p.command.MaxLeaseIndex
prevEncodedCommand := p.encodedCommand
p.command.MaxLeaseIndex = 0
p.encodedCommand = nil
pErr := r.propose(ctx, p, tok.Move(ctx))
if pErr != nil {
// On error, reset the fields we zeroed out to their old value.
// This ensures that the proposal doesn't count as Superseded
// now.
p.command.MaxLeaseIndex = prevMaxLeaseIndex
p.encodedCommand = prevEncodedCommand
return pErr
}
log.VEventf(ctx, 2, "reproposed command %x", cmd.ID)
Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,31 +220,31 @@ func (sm *replicaStateMachine) ApplySideEffects(
sm.r.handleReadWriteLocalEvalResult(ctx, *cmd.localResult)
}

rejected := cmd.Rejected()
higherReproposalsExist := cmd.Cmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex
if !rejected && higherReproposalsExist {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
if higherReproposalsExist := cmd.proposal.Supersedes(cmd.Cmd.MaxLeaseIndex); higherReproposalsExist {
// If the command wasn't rejected, we just applied it and no higher
// reproposal must exist (since that one may also apply).
//
// If the command was rejected with ProposalRejectionPermanent, no higher
// reproposal should exist (after all, whoever made that reproposal should
// also have seen a permanent rejection).
//
// If it was rejected with ProposalRejectionIllegalLeaseIndex, then the
// subsequent call to tryReproposeWithNewLeaseIndex[^1] must have returned an
// error (or the proposal would not be IsLocal() now). But that call
// cannot return an error for a proposal that is already superseded
// initially.
//
// [^1]: see (*replicaDecoder).retrieveLocalProposals()
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index: %+v", cmd)
}
if !rejected && cmd.proposal.applied {
if !cmd.Rejected() && cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
// If any reproposals at a higher MaxLeaseIndex exist we know that they will
// never successfully apply, remove them from the map to avoid future
// reproposals. If there is no command referencing this proposal at a higher
// MaxLeaseIndex then it will already have been removed (see
// shouldRemove in replicaDecoder.retrieveLocalProposals()). It is possible
// that a later command in this batch referred to this proposal but it must
// have failed because it carried the same MaxLeaseIndex.
if higherReproposalsExist {
sm.r.mu.Lock()
delete(sm.r.mu.proposals, cmd.ID)
sm.r.mu.Unlock()
}
cmd.proposal.applied = true
}
return cmd, nil
Expand Down
Loading

0 comments on commit 1d4acc7

Please sign in to comment.