From 50770af24c98323a90cd3306758753f95f1010c8 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 18 Oct 2016 04:56:08 -0400 Subject: [PATCH] storage: evaluate proposals without lock The main objective of this change is to (later) enable proposer-evaluated KV to compute the `WriteBatch` when evaluating a proposal (i.e. when turning a `BatchRequest` into a `*ProposalData`). The previous code assumes making a proposal is cheap, and thus it is done under large critical sections, encompassing in particular the command reproposal and refurbishment logic. Unwinding the latter makes this change relatively subtle and it should receive the appropriate scrutiny. In particular, - externally-triggered proposal refreshal (such as after election or periodically) is now queued with the scheduler and then executed by it. This was necessary since the goroutines requesting the refresh should not be blocked by (potentially extensive) disk I/O. - similarly, refurbishments during command application were offloaded, but to a goroutine. - MaxLeaseIndex is assigned at the latest possible moment before submitting to Raft. Multiple inbound requests can pass the command queue without interacting, and previous code wasn't doing a thorough job at making it more likely that these proposals would submit to Raft in the order they underwent the lease index assignment, which wasn't necessary due to it happening in a large critical section. Now that this critical section has split, this had to move. An alternative, namely bubbling refurbishment up to the client's done channel (which would then have to repropose it as a completely new command) was not considered, but may be preferrable in the long run. At the time of writing, this change is WIP and depends on yet-unmerged #10004. To allow reviewing, this PR was opened against a branch containing that PR. --- pkg/storage/replica.go | 413 +++++++++++++++++++++++----------- pkg/storage/replica_test.go | 188 ++++++++++------ pkg/storage/scheduler.go | 22 ++ pkg/storage/scheduler_test.go | 24 +- pkg/storage/store.go | 18 ++ 5 files changed, 461 insertions(+), 204 deletions(-) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 44dc24cb52c2..3309a6fc631e 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -280,6 +280,13 @@ type Replica struct { // propose has been called, but which have not yet // applied. // + // The *ProposalData in the map are "owned" by it in the sense that + // anyone wishing to hold on to a *ProposalData from the map while not + // holding on to the map (i.e. holding Replica.mu) must remove it from + // the map first. This notably happens when a command needs + // refurbishment, in which case it is removed from the map, reevaluated + // (not under Replica.mu), and then reinserted. + // // TODO(tschottdorf): evaluate whether this should be a list/slice. proposals map[storagebase.CmdIDKey]*ProposalData internalRaftGroup *raft.RawNode @@ -670,10 +677,8 @@ func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error { // If there was a previous replica, repropose its pending commands under // this new incarnation. if previousReplicaID != 0 { - // propose pending commands under new replicaID - if err := r.refreshPendingCmdsLocked(reasonReplicaIDChanged, 0); err != nil { - return err - } + // repropose all pending commands under new replicaID. + r.queueRefreshAllPendingCmds(reasonReplicaIDChanged) } return nil @@ -1489,12 +1494,42 @@ func (r *Replica) assert5725(ba roachpb.BatchRequest) { } } -// addWriteCmd first adds the keys affected by this command as pending writes -// to the command queue. Next, the timestamp cache is checked to determine if -// any newer accesses to this command's affected keys have been made. If so, -// the command's timestamp is moved forward. Finally, the command is submitted -// to Raft. Upon completion, the write is removed from the command queue and any -// error returned. +// addWriteCmd is client request's entry point for commands which may mutate +// the Range's replicated state. Requests taking this path are ultimately +// serialized through Raft, but pass through additional machinery whose goal is +// to allow commands which commute to be proposed in parallel. The naive +// alternative (submitting requests to Raft one after another, paying massive +// latency) is only taken for commands whose effects may overlap. +// +// Concretely, +// +// - the keys affected by the command are added to the command queue (i.e. +// tracked as in-flight mutations). +// - wait until the command group promises that no overlapping mutations are +// in flight. +// - the timestamp cache is checked to determine if the command's affected keys +// were accessed with a timestamp exceeding that of the command; if so, the +// command's timestamp is incremented accordingly. +// - the command is evaluated, i.e. a WriteBatch containing the raw effects of +// the command's application is computed along with some auxiliary data, +// resulting in a ProposalData. +// - the ProposalData is inserted into the Replica's in-flight proposals map, +// a lease index is assigned to it, and it is submitted to Raft. [*] +// - Upon completion, the command is registered in the timestamp cache, removed +// from the command queue, and its result (which could be an error) returned +// to the client waiting on the channel returned by this method. +// +// [*]: the life of a proposal between being submitted and applied is complex +// due to dealing with replay protection, see refreshPendingCmds for details. +// Briefly put, the command may have to be submitted or even reevaluated +// multiple times, until a lease index is chosen (at time of submitting) which +// is legal (at time of Raft application). +// +// TODO(tschottdorf): take special care with "special" commands and their +// reorderings. For example, a batch of writes and a split could be in flight +// in parallel without overlap, but if the writes hit the RHS, something must +// prevent them from writing outside of the key range when they apply. +// Similarly, a command proposed under lease A must not apply under lease B. func (r *Replica) addWriteCmd( ctx context.Context, ba roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { @@ -1591,30 +1626,29 @@ func (r *Replica) addWriteCmd( return br, pErr } -// TODO(tschottdorf): for proposer-evaluated Raft, need to refactor so that -// this does not happen under Replica.mu. -func (r *Replica) evaluateProposalLocked( +// evaluateProposal generate ProposalData from the given request by evaluating +// it, returning both state which is held only on the proposer and that which +// is to be replicated through Raft. The return value is ready to be inserted +// into Replica's proposal map and subsequently passed to submitProposalLocked. +// +// Replica.mu must not be held. +// +// TODO(tschottdorf): with proposer-evaluated KV, a WriteBatch will be prepared +// in this method. +func (r *Replica) evaluateProposal( ctx context.Context, idKey storagebase.CmdIDKey, replica roachpb.ReplicaDescriptor, ba roachpb.BatchRequest, ) *ProposalData { - if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex { - r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex - } - if !ba.IsLeaseRequest() { - r.mu.lastAssignedLeaseIndex++ - } - if log.V(4) { - log.Infof(ctx, "prepared command %x: maxLeaseIndex=%d leaseAppliedIndex=%d", - idKey, r.mu.lastAssignedLeaseIndex, r.mu.state.LeaseAppliedIndex) - } + // Note that we don't hold any locks at this point. This is important + // since evaluating a proposal is expensive (at least under proposer- + // evaluated KV). var pd ProposalData pd.RaftCommand = &storagebase.RaftCommand{ RangeID: r.RangeID, OriginReplica: replica, Cmd: ba, - MaxLeaseIndex: r.mu.lastAssignedLeaseIndex, } pd.ctx = ctx pd.idKey = idKey @@ -1661,16 +1695,23 @@ func (r *Replica) propose( // has a tiny (~1%) performance hit for single-node block_writer testing. r.raftMu.Lock() defer r.raftMu.Unlock() + r.mu.Lock() - defer r.mu.Unlock() if r.mu.destroyed != nil { + r.mu.Unlock() return nil, nil, r.mu.destroyed } repDesc, err := r.getReplicaDescriptorLocked() if err != nil { + r.mu.Unlock() return nil, nil, err } - pCmd := r.evaluateProposalLocked(ctx, makeIDKey(), repDesc, ba) + r.mu.Unlock() + + pCmd := r.evaluateProposal(ctx, makeIDKey(), repDesc, ba) + + r.mu.Lock() + defer r.mu.Unlock() r.insertProposalLocked(pCmd) if err := r.submitProposalLocked(pCmd); err != nil { @@ -1691,6 +1732,23 @@ func (r *Replica) propose( // The replica lock must be held. func (r *Replica) submitProposalLocked(p *ProposalData) error { p.proposedAtTicks = r.mu.ticks + // Assign a lease index. Note that we do this as late as possible + // to make sure (to the extent that we can) that we don't assign + // (=predict) the index differently from the order in which commands are + // proposed (and thus likely applied). + isLeaseReq := p.RaftCommand.Cmd.IsLeaseRequest() + if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex { + r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex + } + if !isLeaseReq { + r.mu.lastAssignedLeaseIndex++ + } + p.RaftCommand.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex + if log.V(4) { + log.Infof(p.ctx, "submitting proposal %x: maxLeaseIndex=%d", + p.idKey, p.RaftCommand.MaxLeaseIndex) + } + if r.mu.submitProposalFn != nil { return r.mu.submitProposalFn(p) } @@ -2026,12 +2084,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { } } if refreshReason != noReason { - r.mu.Lock() - err := r.refreshPendingCmdsLocked(refreshReason, 0) - r.mu.Unlock() - if err != nil { - return err - } + // TODO(tschottdorf): decide whether to queue instead: + // r.queueRefreshAllPendingCmds(refreshReason) + // Do before merge. + r.refreshPendingCmds(0) } // TODO(bdarnell): need to check replica id and not Advance if it @@ -2045,64 +2101,64 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { // tick the Raft group, returning any error and true if the raft group exists // and false otherwise. +// TODO(tschottdorf): unused error return. func (r *Replica) tick() (bool, error) { - r.raftMu.Lock() - defer r.raftMu.Unlock() - return r.tickRaftMuLocked() -} - -func (r *Replica) tickRaftMuLocked() (bool, error) { - r.mu.Lock() - defer r.mu.Unlock() + ticked, shouldRefresh := func() (bool, bool) { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() - // If the raft group is uninitialized, do not initialize raft groups on - // tick. - if r.mu.internalRaftGroup == nil { - return false, nil - } - if r.mu.quiescent { - // While a replica is quiesced we still advance its logical clock. This is - // necessary to avoid a scenario where the leader quiesces and a follower - // does not. The follower calls an election but the election fails because - // the leader and other follower believe that no time in the current term - // has passed. The Raft group is then in a state where one member has a - // term that is advanced which will then cause subsequent heartbeats from - // the existing leader to be rejected in a way that the leader will step - // down. This situation is caused by an interaction between quiescence and - // the Raft CheckQuorum feature which relies on the logical clock ticking - // at roughly the same rate on all members of the group. - // - // By ticking the logical clock (incrementing an integer) we avoid this - // situation. If one of the followers does not quiesce it will call an - // election but the election will succeed. Note that while we expect such - // elections from quiesced followers to be extremely rare, it is very - // difficult to completely eliminate them so we want to minimize the - // disruption when they do occur. - // - // For more details, see #9372. - r.mu.internalRaftGroup.TickQuiesced() - return false, nil - } - if r.maybeQuiesceLocked() { - return false, nil - } + // If the raft group is uninitialized, do not initialize raft groups on + // tick. + if r.mu.internalRaftGroup == nil { + return false, false + } + if r.mu.quiescent { + // While a replica is quiesced we still advance its logical clock. This is + // necessary to avoid a scenario where the leader quiesces and a follower + // does not. The follower calls an election but the election fails because + // the leader and other follower believe that no time in the current term + // has passed. The Raft group is then in a state where one member has a + // term that is advanced which will then cause subsequent heartbeats from + // the existing leader to be rejected in a way that the leader will step + // down. This situation is caused by an interaction between quiescence and + // the Raft CheckQuorum feature which relies on the logical clock ticking + // at roughly the same rate on all members of the group. + // + // By ticking the logical clock (incrementing an integer) we avoid this + // situation. If one of the followers does not quiesce it will call an + // election but the election will succeed. Note that while we expect such + // elections from quiesced followers to be extremely rare, it is very + // difficult to completely eliminate them so we want to minimize the + // disruption when they do occur. + // + // For more details, see #9372. + r.mu.internalRaftGroup.TickQuiesced() + return false, false + } + if r.maybeQuiesceLocked() { + return false, false + } - r.mu.ticks++ - r.mu.internalRaftGroup.Tick() - if !r.store.TestingKnobs().DisableRefreshReasonTicks && - r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 { + r.mu.ticks++ + r.mu.internalRaftGroup.Tick() // RaftElectionTimeoutTicks is a reasonable approximation of how long we // should wait before deciding that our previous proposal didn't go - // through. Note that the combination of the above condition and passing - // RaftElectionTimeoutTicks to refreshPendingCmdsLocked means that commands - // will be refreshed when they have been pending for 1 to 2 election - // cycles. - if err := r.refreshPendingCmdsLocked( - reasonTicks, r.store.cfg.RaftElectionTimeoutTicks); err != nil { - return true, err - } + // through. In effect, proposals will will be refreshed when they have + // been pending for 1 to 2 election cycles. + refresh := !r.store.TestingKnobs().DisableRefreshReasonTicks && + r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 + return true, refresh + }() + if shouldRefresh { + // TODO(tschottdorf): decide whether to queue instead: + // r.queueRefreshAllPendingCmds(reasonTicks) + // Do before merge. + _ = reasonTicks + r.refreshPendingCmds(r.store.cfg.RaftElectionTimeoutTicks) } - return true, nil + return ticked, nil } var enableQuiescence = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_QUIESCENCE", true) @@ -2292,19 +2348,53 @@ const ( reasonTicks ) -func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDelta int) error { - if len(r.mu.proposals) == 0 { - return nil - } +// queueRefreshStalePendingCmds causes a future asynchronous call to +// refreshPendingCmds with the ElectionTimeoutTicks as parameter. +func (r *Replica) queueRefreshStalePendingCmds(reason refreshRaftReason) { + log.Warningf(r.ctx, "queueing refresh of stale pending commands: %s", reason) + r.store.scheduler.EnqueueRaftRefreshStale(r.RangeID) +} + +// TODO(tschottdorf): remove before merge if it remains unused. +var _ func(refreshRaftReason) = (*Replica)(nil).queueRefreshStalePendingCmds - // Note that we can't use the commit index here (which is typically a - // little ahead), because a pending command is removed only as it applies. - // Thus we'd risk reproposing a command that has been committed but not yet - // applied. - maxWillRefurbish := r.mu.state.LeaseAppliedIndex // indexes <= will be refurbished +// queueRefreshAllPendingCmds causes a future asynchronous call to +// refreshPendingCmds which refreshes all pending proposals. +func (r *Replica) queueRefreshAllPendingCmds(reason refreshRaftReason) { + log.Warningf(r.ctx, "queueing refresh of all pending commands: %s", reason) + r.store.scheduler.EnqueueRaftRefreshAll(r.RangeID) +} + +// refreshPendingCmds goes through the pending proposals, refurbishing what it +// can and reproposing the rest. +// +// Refurbishing roughly equates being allowed to assume that the previously +// submitted command can no more be applied successfully (due to the applied +// lease index having surpassed the proposal's MaxLeaseIndex). Reproposing +// refers to simply taking the in-flight proposal and submitting it to Raft +// again; this is what we have to do for everything that couldn't be +// refurbished as refurbishing anyway would lead to doubly-applying some of +// the proposals. +// +// refreshAtDelta specifies how old (in ticks) +// a command must be for it to be inspected; usually this is called with zero +// (affect everything) or the number of ticks of an election timeout (affect +// only proposals that have had ample time to apply but didn't). +// +// TODO(tschottdorf): rename to refreshProposals. +func (r *Replica) refreshPendingCmds(refreshAtDelta int) { + var refurbish []*ProposalData + var repropose pendingCmdSlice + + r.mu.Lock() + // indexes <= maxWillRefurbish will be refurbished. Note that we can't use + // the commit index here (which is typically a little ahead of + // LeaseAppliedIndex) because a pending command is removed only as it + // applies. Thus we'd risk reproposing a command that has been committed + // but not yet applied. + maxWillRefurbish := r.mu.state.LeaseAppliedIndex refreshAtTicks := r.mu.ticks - refreshAtDelta - refurbished := 0 - var reproposals pendingCmdSlice + numPending := len(r.mu.proposals) for idKey, p := range r.mu.proposals { if p.proposedAtTicks > refreshAtTicks { // The command was proposed too recently, don't bother reproprosing or @@ -2313,22 +2403,17 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe continue } if p.RaftCommand.MaxLeaseIndex > maxWillRefurbish { - reproposals = append(reproposals, p) + repropose = append(repropose, p) continue } + // The command can be refurbished. Delete it from the map; hold on to + // it to repropose it once we've unlocked the Replica. delete(r.mu.proposals, idKey) - // The command can be refurbished. - log.Eventf(p.ctx, "refurbishing command %x; %s", p.idKey, reason) - if pErr := r.refurbishPendingCmdLocked(p); pErr != nil { - p.done <- roachpb.ResponseWithError{Err: pErr} - } - refurbished++ - } - if log.V(1) && (refurbished > 0 || len(reproposals) > 0) { - log.Infof(r.ctx, - "pending commands: refurbished %d, reproposing %d (at %d.%d); %s", - refurbished, len(reproposals), r.mu.state.RaftAppliedIndex, - r.mu.state.LeaseAppliedIndex, reason) + log.Eventf( + p.ctx, "refurbishing command %x (%d ticks old)", + p.idKey, r.mu.ticks-p.proposedAtTicks, + ) + refurbish = append(refurbish, p) } // Reproposals are those commands which we weren't able to refurbish (since @@ -2336,14 +2421,37 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe // index). // For reproposals, it's generally pretty unlikely that they can make it in // the right place. Reproposing in order is definitely required, however. - sort.Sort(reproposals) - for _, p := range reproposals { - log.Eventf(p.ctx, "reproposing command %x; %s", p.idKey, reason) + // + // It makes sense to repropose before refurbishing: If a reproposal can + // make it, then only if no other lease index has been assigned and + // successfully applied after (which refurbishing first would essentially + // guarantee). + sort.Sort(repropose) + for _, p := range repropose { + log.Eventf(p.ctx, "reproposing command %x", p.idKey) if err := r.submitProposalLocked(p); err != nil { - return err + delete(r.mu.proposals, p.idKey) + p.done <- roachpb.ResponseWithError{Err: roachpb.NewError(err)} + close(p.done) } } - return nil + + raftAppliedIndex := r.mu.state.RaftAppliedIndex + leaseAppliedIndex := r.mu.state.LeaseAppliedIndex + r.mu.Unlock() + + for _, p := range refurbish { + if pErr := r.refurbishPendingCmd(p); pErr != nil { + p.done <- roachpb.ResponseWithError{Err: pErr} + close(p.done) + } + } + + if log.V(1) && (len(refurbish) > 0 || len(repropose) > 0) { + log.Infof(r.ctx, + "pending commands: refurbished %d, reproposing %d (of %d at %d.%d)", + len(refurbish), len(repropose), numPending, raftAppliedIndex, leaseAppliedIndex) + } } func (r *Replica) getReplicaDescriptorByIDLocked( @@ -2485,17 +2593,33 @@ func (r *Replica) reportSnapshotStatus(to uint64, snapErr error) { } } -// refurbishPendingCmdLocked takes a pendingCmd which was discovered to apply -// at a log position other than the one at which it was originally proposed -// (this can happen when the range lease held by a raft follower, who must -// forward MsgProp messages to the raft leader without guaranteed ordering). -// It inserts and proposes a new command, returning an error if that fails. -// The passed command must have been deleted from r.mu.proposals. -func (r *Replica) refurbishPendingCmdLocked(cmd *ProposalData) *roachpb.Error { +// refurbishPendingCmd takes a pendingCmd which was discovered to apply at +// a log position other than the one at which it was originally proposed (this +// can happen when the range lease held by a raft follower, who must forward +// MsgProp messages to the raft leader without guaranteed ordering). It inserts +// and proposes a new command, returning an error if that fails. +// +// The passed command must have been deleted from r.mu.proposals prior to the +// call to this method, and it must have been checked that the command is +// eligible for refurbishment (i.e. commands have applied at a lease index +// greater or equal to that of this command). +// +// TODO(tschottdorf): now that this isn't under a continuous lock with the +// removal and reinsertion any more, make sure that we can't end up in +// situations in which a command is refurbished and applies multiple times. +// I *think* that is ok since we always remove the command from the proposal +// map and we *only* do so if we can refurbish (meaning that all potential +// refurbishers serialize through the proposals map). +// +// TODO(tschottdorf): s/refurbish/reevaluate/g. +func (r *Replica) refurbishPendingCmd(cmd *ProposalData) *roachpb.Error { // Note that the new command has the same idKey (which matters since we // leaked that to the pending client). - newPCmd := r.evaluateProposalLocked(cmd.ctx, cmd.idKey, + newPCmd := r.evaluateProposal(cmd.ctx, cmd.idKey, cmd.RaftCommand.OriginReplica, cmd.RaftCommand.Cmd) + + r.mu.Lock() + defer r.mu.Unlock() newPCmd.done = cmd.done r.insertProposalLocked(newPCmd) if err := r.submitProposalLocked(newPCmd); err != nil { @@ -2606,23 +2730,48 @@ func (r *Replica) processRaftCommand( // already gone through the trouble of doing so (which would have // changed our local copy of the pending command). We want to error // out, but keep the pending command (i.e. not tell the client) - // so that the future incarnation can apply and notify the it. - // Note that we keep the context to avoid hiding these internal - // cycles from traces. + // so that the future incarnation can apply and notify the client. + // Note that we keep the context to expose these internal cycles + // in the request trace. if localMaxLeaseIndex := cmd.RaftCommand.MaxLeaseIndex; localMaxLeaseIndex <= raftCmd.MaxLeaseIndex { log.VEventf( ctx, 1, "refurbishing command %x; <= %d observed at %d", cmd.idKey, raftCmd.MaxLeaseIndex, leaseIndex, ) - if pErr := r.refurbishPendingCmdLocked(cmd); pErr == nil { - cmd.done = make(chan roachpb.ResponseWithError, 1) - } else { - // We could try to send the error to the client instead, - // but to avoid even the appearance of Replica divergence, - // let's not. - log.Warningf(ctx, "unable to refurbish: %s", pErr) - } + // Once we're done processing this command (which is going to + // error out, with the error redirected away from the client), + // refurbish the command. We do this asynchronously because + // it's expensive and shouldn't block this goroutine. + cmdCopy := *cmd // copy for the goroutine + cmd.done = make(chan roachpb.ResponseWithError, 1) + forcedErrCopy := protoutil.Clone(forcedErr).(*roachpb.Error) + defer func() { + // TODO(tschottdorf) before merge, decide where this work + // should be done. Simply spawning a goroutine seems wrong. + if err := r.store.Stopper().RunAsyncTask(ctx, func(ctx context.Context) { + if pErr := r.refurbishPendingCmd(&cmdCopy); pErr != nil { + log.Warning(ctx, errors.Wrap(pErr.GoError(), "while trying to refurbish")) + // Send the forced error back to avoid even the + // impression of replica divergence. + // + // Note that we took a copy; this is mostly to follow + // protocol (as we know there's no client on the + // channel receiving the original). + cmdCopy.done <- roachpb.ResponseWithError{Err: forcedErrCopy} + close(cmdCopy.done) + } + }); err != nil { + // If a client is waiting on this command's done + // channel and can't abandon the command, this code + // path would deadlock the client if we didn't send it + // the error. + cmdCopy.done <- roachpb.ResponseWithError{ + Err: roachpb.NewError(err), + } + close(cmdCopy.done) + } + }() } else { // The refurbishment is already in flight, so we better get cmd back // into proposals (the alternative, not deleting it in this case diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 7bcc00b5f3f5..427ce05a4256 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -5782,7 +5782,11 @@ func TestReplicaIDChangePending(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} - tc.Start(t) + cfg := TestStoreConfig() + // Disable ticks to avoid automatic reproposals after a timeout, which + // would pass this test. + cfg.RaftTickInterval = time.Hour + tc.StartWithStoreConfig(t, cfg) defer tc.Stop() rng := tc.rng @@ -5809,24 +5813,20 @@ func TestReplicaIDChangePending(t *testing.T) { // re-proposed. commandProposed := make(chan struct{}, 1) rng.mu.Lock() - defer rng.mu.Unlock() rng.mu.submitProposalFn = func(p *ProposalData) error { if p.RaftCommand.Cmd.Timestamp.Equal(magicTS) { commandProposed <- struct{}{} } return nil } + rng.mu.Unlock() // Set the ReplicaID on the replica. - if err := rng.setReplicaIDLocked(2); err != nil { + if err := rng.setReplicaID(2); err != nil { t.Fatal(err) } - select { - case <-commandProposed: - default: - t.Fatal("command was not re-proposed") - } + <-commandProposed } // runWrongIndexTest runs a reproposal or refurbishment test, optionally @@ -5839,29 +5839,31 @@ func runWrongIndexTest(t *testing.T, repropose bool, withErr bool, expProposals tc.Start(t) defer tc.Stop() - prefix := fmt.Sprintf("repropose=%t withErr=%t: ", repropose, withErr) - fatalf := func(msg string, args ...interface{}) { - t.Fatal(errors.Errorf(prefix+msg, args...)) - } + ctx := log.WithLogTag(context.TODO(), "repropose", repropose) + ctx = log.WithLogTag(ctx, "withErr", withErr) type magicKey struct{} var c int32 // updated atomically tc.rng.mu.Lock() + interfere := func(*ProposalData) {} // protected by tc.rng.mu tc.rng.mu.submitProposalFn = func(cmd *ProposalData) error { if v := cmd.ctx.Value(magicKey{}); v != nil { curAttempt := atomic.AddInt32(&c, 1) if (repropose || curAttempt == 2) && withErr { return errors.New("boom") } + if curAttempt == 1 { + interfere(cmd) + } } return defaultSubmitProposalLocked(tc.rng, cmd) } tc.rng.mu.Unlock() - if pErr := tc.rng.redirectOnOrAcquireLease(context.Background()); pErr != nil { - fatalf("%s", pErr) + if pErr := tc.rng.redirectOnOrAcquireLease(ctx); pErr != nil { + t.Fatal(pErr) } pArg := putArgs(roachpb.Key("a"), []byte("asd")) @@ -5869,7 +5871,7 @@ func runWrongIndexTest(t *testing.T, repropose bool, withErr bool, expProposals var ba roachpb.BatchRequest ba.Add(&pArg) ba.Timestamp = tc.clock.Now() - if _, pErr := tc.Sender().Send(context.Background(), ba); pErr != nil { + if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { t.Fatal(pErr) } } @@ -5884,7 +5886,7 @@ func runWrongIndexTest(t *testing.T, repropose bool, withErr bool, expProposals wrongIndex := ai - 1 // will chose this as MaxLeaseIndex - log.Infof(context.Background(), "test begins") + log.Infof(ctx, "test begins") var ba roachpb.BatchRequest ba.Timestamp = tc.clock.Now() @@ -5895,27 +5897,39 @@ func runWrongIndexTest(t *testing.T, repropose bool, withErr bool, expProposals t.Fatal(err) } ch := func() chan roachpb.ResponseWithError { - tc.rng.mu.Lock() - defer tc.rng.mu.Unlock() // Make a new command, but pretend it didn't increment the assignment // counter. This leaks some implementation, but not too much. + tc.rng.mu.Lock() preAssigned := tc.rng.mu.lastAssignedLeaseIndex - cmd := tc.rng.evaluateProposalLocked( - context.WithValue(context.Background(), magicKey{}, "foo"), + tc.rng.mu.Unlock() + cmd := tc.rng.evaluateProposal( + context.WithValue(ctx, magicKey{}, "foo"), makeIDKey(), repDesc, ba) cmd.RaftCommand.MaxLeaseIndex = preAssigned + + tc.rng.mu.Lock() + defer tc.rng.mu.Unlock() tc.rng.mu.lastAssignedLeaseIndex = preAssigned if err != nil { - fatalf("%s", err) + t.Fatal(err) } - cmd.RaftCommand.MaxLeaseIndex = wrongIndex tc.rng.insertProposalLocked(cmd) if repropose { - if err := tc.rng.refreshPendingCmdsLocked(noReason, 0); err != nil { - fatalf("%s", err) + // Set the wrong index directly. + cmd.RaftCommand.MaxLeaseIndex = wrongIndex + tc.rng.mu.Unlock() + tc.rng.refreshPendingCmds(0) + tc.rng.mu.Lock() + } else { + // We set up proposal interception so that this is called after + // submitProposalLocked has assigned a (reasonable) index. + interfere = func(p *ProposalData) { + log.Infof(context.TODO(), "assigning incorrect index %d", wrongIndex) + p.RaftCommand.MaxLeaseIndex = wrongIndex + } + if err := tc.rng.submitProposalLocked(cmd); err != nil { + t.Fatal(err) } - } else if err := tc.rng.submitProposalLocked(cmd); err != nil { - fatalf("%s", err) } return cmd.done }() @@ -5929,10 +5943,10 @@ func runWrongIndexTest(t *testing.T, repropose bool, withErr bool, expProposals if rwe := <-ch; rwe.Err != nil != withErr || (withErr && !testutils.IsPError(rwe.Err, errStr)) { - fatalf("%s", rwe.Err) + t.Fatal(rwe.Err) } if n := atomic.LoadInt32(&c); n != expProposals { - fatalf("expected %d proposals, got %d", expProposals, n) + t.Fatalf("expected %d proposals, got %d", expProposals, n) } } @@ -5993,14 +6007,13 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { var chs []chan roachpb.ResponseWithError func() { - rng.mu.Lock() - defer rng.mu.Unlock() for i := 0; i < num; i++ { var ba roachpb.BatchRequest ba.Timestamp = tc.clock.Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{ Key: roachpb.Key(fmt.Sprintf("k%d", i))}}) - cmd := rng.evaluateProposalLocked(context.Background(), makeIDKey(), repDesc, ba) + cmd := rng.evaluateProposal(context.Background(), makeIDKey(), repDesc, ba) + rng.mu.Lock() rng.insertProposalLocked(cmd) // We actually propose the command only if we don't // cancel it to simulate the case in which Raft loses @@ -6010,11 +6023,12 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { log.Infof(context.Background(), "abandoning command %d", i) delete(rng.mu.proposals, cmd.idKey) } else if err := rng.submitProposalLocked(cmd); err != nil { - t.Fatal(err) + t.Error(err) } else { chs = append(chs, cmd.done) } + rng.mu.Unlock() } }() @@ -6060,8 +6074,6 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { expIndexes := make([]int, 0, num) chs := func() []chan roachpb.ResponseWithError { - tc.rng.mu.Lock() - defer tc.rng.mu.Unlock() chs := make([]chan roachpb.ResponseWithError, 0, num) origIndexes := make([]int, 0, num) @@ -6072,16 +6084,21 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { ba.Timestamp = tc.clock.Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{ Key: roachpb.Key(fmt.Sprintf("k%d", i))}}) - cmd := tc.rng.evaluateProposalLocked(ctx, makeIDKey(), repDesc, ba) + cmd := tc.rng.evaluateProposal(ctx, makeIDKey(), repDesc, ba) + + tc.rng.mu.Lock() tc.rng.insertProposalLocked(cmd) chs = append(chs, cmd.done) + tc.rng.mu.Unlock() } + tc.rng.mu.Lock() for _, p := range tc.rng.mu.proposals { if v := p.ctx.Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.RaftCommand.MaxLeaseIndex)) } } + tc.rng.mu.Unlock() sort.Ints(origIndexes) @@ -6089,9 +6106,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { t.Fatalf("wanted required indexes %v, got %v", expIndexes, origIndexes) } - if err := tc.rng.refreshPendingCmdsLocked(noReason, 0); err != nil { - t.Fatal(err) - } + tc.rng.refreshPendingCmds(0) return chs }() for _, ch := range chs { @@ -6119,18 +6134,20 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { }) } +// TODO(tschottdorf): I've made this test flaky (reproposals may be triggered +// by leader election and that throws off the test). Fix before merge. func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { defer leaktest.AfterTest(t)() var tc testContext - tc.Start(t) + cfg := TestStoreConfig() + cfg.RaftTickInterval = 24 * time.Hour // disable ticks + tc.StartWithStoreConfig(t, cfg) defer tc.Stop() // Grab Replica.raftMu in order to block normal raft replica processing. This // test is ticking the replica manually and doesn't want the store to be // doing so concurrently. r := tc.rng - r.raftMu.Lock() - defer r.raftMu.Unlock() repDesc, err := r.GetReplicaDescriptor() if err != nil { @@ -6148,29 +6165,51 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ticks := r.mu.ticks r.mu.Unlock() for ; (ticks % electionTicks) != 0; ticks++ { - if _, err := r.tickRaftMuLocked(); err != nil { + if _, err := r.tick(); err != nil { t.Fatal(err) } } } + var dropProposals struct { + syncutil.Mutex + m map[*ProposalData]struct{} + } + dropProposals.m = make(map[*ProposalData]struct{}) + + r.mu.Lock() + r.mu.submitProposalFn = func(pd *ProposalData) error { + dropProposals.Lock() + defer dropProposals.Unlock() + if _, ok := dropProposals.m[pd]; !ok { + return defaultSubmitProposalLocked(r, pd) + } + return nil // pretend we proposed though we haven't + } + r.mu.Unlock() + // We tick the replica 2*RaftElectionTimeoutTicks. RaftElectionTimeoutTicks // is special in that it controls how often pending commands are reproposed // or refurbished. for i := 0; i < 2*electionTicks; i++ { // Add another pending command on each iteration. - r.mu.Lock() id := fmt.Sprintf("%08d", i) var ba roachpb.BatchRequest ba.Timestamp = tc.clock.Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{Key: roachpb.Key(id)}}) - cmd := r.evaluateProposalLocked(context.Background(), + cmd := r.evaluateProposal(context.Background(), storagebase.CmdIDKey(id), repDesc, ba) + + dropProposals.Lock() + dropProposals.m[cmd] = struct{}{} // silently drop proposals + dropProposals.Unlock() + + r.mu.Lock() r.insertProposalLocked(cmd) if err := r.submitProposalLocked(cmd); err != nil { - t.Fatal(err) + t.Error(err) } - // Build a map from command key to proposed-at-ticks. + // Build the map of expected reproposals at this stage. m := map[storagebase.CmdIDKey]int{} for id, p := range r.mu.proposals { m[id] = p.proposedAtTicks @@ -6178,32 +6217,45 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { r.mu.Unlock() // Tick raft. - if _, err := r.tickRaftMuLocked(); err != nil { + if _, err := r.tick(); err != nil { t.Fatal(err) } - // Gather up the reproposed commands. - r.mu.Lock() - var reproposed []*ProposalData - for id, p := range r.mu.proposals { - if m[id] != p.proposedAtTicks { - reproposed = append(reproposed, p) + // Gather up the reproposed commands. Since reproposals are driven + // by the scheduler, we have to wait until it's gotten around to it. + util.SucceedsSoon(t, func() error { + r.mu.Lock() + ticks := r.mu.ticks + r.mu.Unlock() + + var reproposed []*ProposalData + r.mu.Lock() // avoid data race - proposals belong to the Replica + dropProposals.Lock() + for p := range dropProposals.m { + if p.proposedAtTicks >= ticks { + reproposed = append(reproposed, p) + } } - } - ticks := r.mu.ticks - r.mu.Unlock() - - // Reproposals are only performed every electionTicks. We'll need to fix - // this test if that changes. - if (ticks % electionTicks) == 0 { - if len(reproposed) != i-1 { - t.Fatalf("%d: expected %d reproposed commands, but found %+v", i, i-1, reproposed) + dropProposals.Unlock() + r.mu.Unlock() + + // Reproposals are only performed every electionTicks. We'll need + // to fix this test if that changes. + var err error + if (ticks % electionTicks) == 0 { + if len(reproposed) != i-1 { + err = errors.Errorf("%d: expected %d reproposed commands, but found %d", i, i-1, len(reproposed)) + } + } else { + if len(reproposed) != 0 { + t.Fatalf("%d: expected no reproposed commands, but found %+v", i, reproposed) + } } - } else { - if len(reproposed) != 0 { - t.Fatalf("%d: expected no reproposed commands, but found %+v", i, reproposed) + if err != nil { + log.Warning(context.TODO(), err) } - } + return err + }) } } @@ -6226,10 +6278,8 @@ func TestReplicaDoubleRefurbish(t *testing.T) { // Make a Raft command; we'll set things up so that it will be considered // for refurbishment multiple times. - tc.rng.mu.Lock() - cmd := tc.rng.evaluateProposalLocked(context.Background(), makeIDKey(), repDesc, ba) + cmd := tc.rng.evaluateProposal(context.Background(), makeIDKey(), repDesc, ba) ch := cmd.done // must not use cmd outside of mutex - tc.rng.mu.Unlock() { // Send some random request to advance the lease applied counter to diff --git a/pkg/storage/scheduler.go b/pkg/storage/scheduler.go index 5f360f5ad844..d9bad2d6fc7c 100644 --- a/pkg/storage/scheduler.go +++ b/pkg/storage/scheduler.go @@ -115,6 +115,12 @@ type raftProcessor interface { // Process a raft tick for the specified range. Return true if the range // should be queued for ready processing. processTick(rangeID roachpb.RangeID) bool + // Inspect the pending proposals, refurbishing and reproposing as + // necessary (for instance, after a Raft election event). + processRefreshAll(rangeID roachpb.RangeID) + // Like processRefreshAll, but inspecting only proposals which have been + // pending for long enough to assume that they have been lost. + processRefreshStale(rangeID roachpb.RangeID) } type raftScheduleState int @@ -124,6 +130,8 @@ const ( stateRaftReady stateRaftRequest stateRaftTick + stateRaftRefreshStale + stateRaftRefreshAll ) type raftScheduler struct { @@ -221,6 +229,12 @@ func (s *raftScheduler) worker(stopper *stop.Stopper) { s.mu.state[id] = stateQueued s.mu.Unlock() + if state&stateRaftRefreshAll != 0 { + s.processor.processRefreshAll(id) + } else if state&stateRaftRefreshStale != 0 { + s.processor.processRefreshStale(id) + } + if state&stateRaftTick != 0 { // processRaftTick returns true if the range should perform ready // processing. Do not reorder this below the call to processReady. @@ -319,3 +333,11 @@ func (s *raftScheduler) EnqueueRaftRequest(id roachpb.RangeID) { func (s *raftScheduler) EnqueueRaftTick(ids ...roachpb.RangeID) { s.signal(s.enqueueN(stateRaftTick, ids...)) } + +func (s *raftScheduler) EnqueueRaftRefreshStale(id roachpb.RangeID) { + s.signal(s.enqueue1(stateRaftRefreshStale, id)) +} + +func (s *raftScheduler) EnqueueRaftRefreshAll(id roachpb.RangeID) { + s.signal(s.enqueue1(stateRaftRefreshAll, id)) +} diff --git a/pkg/storage/scheduler_test.go b/pkg/storage/scheduler_test.go index 7e67529c26e4..d158934e3e9e 100644 --- a/pkg/storage/scheduler_test.go +++ b/pkg/storage/scheduler_test.go @@ -127,9 +127,11 @@ func TestRangeIDQueue(t *testing.T) { type testProcessor struct { mu struct { syncutil.Mutex - raftReady map[roachpb.RangeID]int - raftRequest map[roachpb.RangeID]int - raftTick map[roachpb.RangeID]int + raftReady map[roachpb.RangeID]int + raftRequest map[roachpb.RangeID]int + raftTick map[roachpb.RangeID]int + raftRefreshAll map[roachpb.RangeID]int + raftRefreshStale map[roachpb.RangeID]int } } @@ -138,6 +140,8 @@ func newTestProcessor() *testProcessor { p.mu.raftReady = make(map[roachpb.RangeID]int) p.mu.raftRequest = make(map[roachpb.RangeID]int) p.mu.raftTick = make(map[roachpb.RangeID]int) + p.mu.raftRefreshStale = make(map[roachpb.RangeID]int) + p.mu.raftRefreshAll = make(map[roachpb.RangeID]int) return p } @@ -153,6 +157,18 @@ func (p *testProcessor) processRequestQueue(rangeID roachpb.RangeID) { p.mu.Unlock() } +func (p *testProcessor) processRefreshAll(rangeID roachpb.RangeID) { + p.mu.Lock() + p.mu.raftRefreshAll[rangeID]++ + p.mu.Unlock() +} + +func (p *testProcessor) processRefreshStale(rangeID roachpb.RangeID) { + p.mu.Lock() + p.mu.raftRefreshStale[rangeID]++ + p.mu.Unlock() +} + func (p *testProcessor) processTick(rangeID roachpb.RangeID) bool { p.mu.Lock() p.mu.raftTick[rangeID]++ @@ -190,6 +206,8 @@ func (p *testProcessor) String() string { // Verify that enqueuing more ranges than the number of workers correctly // processes all of the ranges. This exercises a code path that was buggy // during development. +// +// TODO(tschottdorf): test raftRefreshAll, raftRefreshStale. func TestSchedulerLoop(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 594cb861fb7b..f5ad1f94eab3 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2901,6 +2901,24 @@ func (s *Store) processTick(rangeID roachpb.RangeID) bool { return exists // ready } +func (s *Store) processRefreshStale(rangeID roachpb.RangeID) { + s.mu.Lock() + r, ok := s.mu.replicas[rangeID] + s.mu.Unlock() + if ok { + r.refreshPendingCmds(s.cfg.RaftElectionTimeoutTicks) + } +} + +func (s *Store) processRefreshAll(rangeID roachpb.RangeID) { + s.mu.Lock() + r, ok := s.mu.replicas[rangeID] + s.mu.Unlock() + if ok { + r.refreshPendingCmds(0) + } +} + func (s *Store) processRaft() { if s.cfg.TestingKnobs.DisableProcessRaft { return