diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6bccc23af56a..27f035834181 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -299,7 +299,9 @@ type Replica struct { // propose has been called, but which have not yet // applied. // - // TODO(tschottdorf): evaluate whether this should be a list/slice. + // The *ProposalData in the map are "owned" by it. Elements from the + // map must only be referenced while Replica.mu is held, except if the + // element is removed from the map first. proposals map[storagebase.CmdIDKey]*ProposalData internalRaftGroup *raft.RawNode // The ID of the replica within the Raft group. May be 0 if the replica has @@ -701,10 +703,8 @@ func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error { // If there was a previous replica, repropose its pending commands under // this new incarnation. if previousReplicaID != 0 { - // propose pending commands under new replicaID - if err := r.refreshPendingCmdsLocked(reasonReplicaIDChanged, 0); err != nil { - return err - } + // repropose all pending commands under new replicaID. + r.refreshProposalsLocked(0, reasonReplicaIDChanged) } return nil @@ -1568,16 +1568,27 @@ func (r *Replica) assert5725(ba roachpb.BatchRequest) { // - the timestamp cache is checked to determine if the command's affected keys // were accessed with a timestamp exceeding that of the command; if so, the // command's timestamp is incremented accordingly. -// - a ProposalData is created and inserted into the Replica's in-flight -// proposals map, a lease index is assigned to it, and it is submitted to Raft, -// returning a channel. +// - the command is evaluated, resulting in a ProposalData. As proposer-eval'ed +// KV isn't implemented yet, this means that the BatchRequest is added to +// the resulting ProposalData object. With proposer-evaluated KV, a +// WriteBatch containing the raw effects of the command's application is +// added along with some auxiliary data. +// - the ProposalData is inserted into the Replica's in-flight proposals map, +// a lease index is assigned to it, and it is submitted to Raft, returning +// a channel. // - the result of the Raft proposal is read from the channel and the command -// registered with the timestamp cache, removed from the command queue, and its -// result (which could be an error) returned to the client. +// registered with the timestamp cache, removed from the command queue, and +// its result (which could be an error) returned to the client. // // Internally, multiple iterations of the above process are may take place due // to the (rare) need to the Raft proposal failing retryably (usually due to // proposal reordering). +// +// TODO(tschottdorf): take special care with "special" commands and their +// reorderings. For example, a batch of writes and a split could be in flight +// in parallel without overlap, but if the writes hit the RHS, something must +// prevent them from writing outside of the key range when they apply. +// Similarly, a command proposed under lease A must not apply under lease B. func (r *Replica) addWriteCmd( ctx context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { @@ -1688,30 +1699,29 @@ func (r *Replica) tryAddWriteCmd( } } -// TODO(tschottdorf): for proposer-evaluated Raft, need to refactor so that -// this does not happen under Replica.mu. -func (r *Replica) evaluateProposalLocked( +// evaluateProposal generates ProposalData from the given request by evaluating +// it, returning both state which is held only on the proposer and that which +// is to be replicated through Raft. The return value is ready to be inserted +// into Replica's proposal map and subsequently passed to submitProposalLocked. +// +// Replica.mu must not be held. +// +// TODO(tschottdorf): with proposer-evaluated KV, a WriteBatch will be prepared +// in this method. +func (r *Replica) evaluateProposal( ctx context.Context, idKey storagebase.CmdIDKey, replica roachpb.ReplicaDescriptor, ba roachpb.BatchRequest, ) *ProposalData { - if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex { - r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex - } - if !ba.IsLeaseRequest() { - r.mu.lastAssignedLeaseIndex++ - } - if log.V(4) { - log.Infof(ctx, "prepared command %x: maxLeaseIndex=%d leaseAppliedIndex=%d", - idKey, r.mu.lastAssignedLeaseIndex, r.mu.state.LeaseAppliedIndex) - } + // Note that we don't hold any locks at this point. This is important + // since evaluating a proposal is expensive (at least under proposer- + // evaluated KV). var pd ProposalData pd.RaftCommand = &storagebase.RaftCommand{ RangeID: r.RangeID, OriginReplica: replica, Cmd: ba, - MaxLeaseIndex: r.mu.lastAssignedLeaseIndex, } pd.ctx = ctx pd.idKey = idKey @@ -1719,13 +1729,28 @@ func (r *Replica) evaluateProposalLocked( return &pd } -func (r *Replica) insertProposalLocked(pCmd *ProposalData) { - idKey := pCmd.idKey - if _, ok := r.mu.proposals[idKey]; ok { +func (r *Replica) insertProposalLocked(pd *ProposalData) { + // Assign a lease index. Note that we do this as late as possible + // to make sure (to the extent that we can) that we don't assign + // (=predict) the index differently from the order in which commands are + // proposed (and thus likely applied). + if r.mu.lastAssignedLeaseIndex < r.mu.state.LeaseAppliedIndex { + r.mu.lastAssignedLeaseIndex = r.mu.state.LeaseAppliedIndex + } + if !pd.RaftCommand.Cmd.IsLeaseRequest() { + r.mu.lastAssignedLeaseIndex++ + } + pd.RaftCommand.MaxLeaseIndex = r.mu.lastAssignedLeaseIndex + if log.V(4) { + log.Infof(pd.ctx, "submitting proposal %x: maxLeaseIndex=%d", + pd.idKey, pd.RaftCommand.MaxLeaseIndex) + } + + if _, ok := r.mu.proposals[pd.idKey]; ok { ctx := r.AnnotateCtx(context.TODO()) - log.Fatalf(ctx, "pending command already exists for %s", idKey) + log.Fatalf(ctx, "pending command already exists for %s", pd.idKey) } - r.mu.proposals[idKey] = pCmd + r.mu.proposals[pd.idKey] = pd } func makeIDKey() storagebase.CmdIDKey { @@ -1748,6 +1773,18 @@ func makeIDKey() storagebase.CmdIDKey { func (r *Replica) propose( ctx context.Context, ba roachpb.BatchRequest, ) (chan proposalResult, func() bool, error) { + r.mu.Lock() + if r.mu.destroyed != nil { + r.mu.Unlock() + return nil, nil, r.mu.destroyed + } + repDesc, err := r.getReplicaDescriptorLocked() + if err != nil { + r.mu.Unlock() + return nil, nil, err + } + r.mu.Unlock() + // submitProposalLocked calls withRaftGroupLocked which requires that // raftMu is held. In order to maintain our lock ordering we need to lock // Replica.raftMu here before locking Replica.mu. @@ -1757,18 +1794,17 @@ func (r *Replica) propose( // optimize for the common case where Replica.mu.internalRaftGroup is // non-nil, but that doesn't currently seem worth it. Always locking raftMu // has a tiny (~1%) performance hit for single-node block_writer testing. + // + // TODO(tschottdorf): holding raftMu during evaluation limits concurrency + // at the range level and is something we will eventually need to address. + // See #10084. r.raftMu.Lock() defer r.raftMu.Unlock() + + pCmd := r.evaluateProposal(ctx, makeIDKey(), repDesc, ba) + r.mu.Lock() defer r.mu.Unlock() - if r.mu.destroyed != nil { - return nil, nil, r.mu.destroyed - } - repDesc, err := r.getReplicaDescriptorLocked() - if err != nil { - return nil, nil, err - } - pCmd := r.evaluateProposalLocked(ctx, makeIDKey(), repDesc, ba) r.insertProposalLocked(pCmd) if err := r.submitProposalLocked(pCmd); err != nil { @@ -1789,6 +1825,7 @@ func (r *Replica) propose( // The replica lock must be held. func (r *Replica) submitProposalLocked(p *ProposalData) error { p.proposedAtTicks = r.mu.ticks + if r.mu.submitProposalFn != nil { return r.mu.submitProposalFn(p) } @@ -1938,7 +1975,7 @@ func (r *Replica) handleRaftReady(inSnap IncomingSnapshot) error { } // handleRaftReadyLocked is the same as handleRaftReady but requires that the -// replica be locked for raft processing via r.raftLock. +// replica's raftMu be held. func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { ctx := r.AnnotateCtx(context.TODO()) var hasReady bool @@ -2139,11 +2176,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked(inSnap IncomingSnapshot) error { } if refreshReason != noReason { r.mu.Lock() - err := r.refreshPendingCmdsLocked(refreshReason, 0) + r.refreshProposalsLocked(0, refreshReason) r.mu.Unlock() - if err != nil { - return err - } } // TODO(bdarnell): need to check replica id and not Advance if it @@ -2163,6 +2197,7 @@ func (r *Replica) tick() (bool, error) { return r.tickRaftMuLocked() } +// tickRaftMuLocked requires that raftMu is held, but not replicaMu. func (r *Replica) tickRaftMuLocked() (bool, error) { r.mu.Lock() defer r.mu.Unlock() @@ -2209,10 +2244,9 @@ func (r *Replica) tickRaftMuLocked() (bool, error) { // RaftElectionTimeoutTicks to refreshPendingCmdsLocked means that commands // will be refreshed when they have been pending for 1 to 2 election // cycles. - if err := r.refreshPendingCmdsLocked( - reasonTicks, r.store.cfg.RaftElectionTimeoutTicks); err != nil { - return true, err - } + r.refreshProposalsLocked( + r.store.cfg.RaftElectionTimeoutTicks, reasonTicks, + ) } return true, nil } @@ -2412,11 +2446,16 @@ const ( reasonTicks ) -func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDelta int) error { - if len(r.mu.proposals) == 0 { - return nil - } - +// refreshProposalsLocked goes through the pending proposals, notifying +// proposers whose proposals need to be retried, and resubmitting proposals +// which were likely dropped (but may still apply at a legal Lease index). +// mu must be held. +// +// refreshAtDelta specifies how old (in ticks) a command must be for it to be +// inspected; usually this is called with zero (affect everything) or the +// number of ticks of an election timeout (affect only proposals that have had +// ample time to apply but didn't). +func (r *Replica) refreshProposalsLocked(refreshAtDelta int, reason refreshRaftReason) { // Note that we can't use the commit index here (which is typically a // little ahead), because a pending command is removed only as it applies. // Thus we'd risk reproposing a command that has been committed but not yet @@ -2447,7 +2486,7 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe if log.V(1) && (numShouldRetry > 0 || len(reproposals) > 0) { ctx := r.AnnotateCtx(context.TODO()) log.Infof(ctx, - "pending commands: sent %d back to client, reproposing %d (at %d.%d); %s", + "pending commands: sent %d back to client, reproposing %d (at %d.%d)k %s", numShouldRetry, len(reproposals), r.mu.state.RaftAppliedIndex, r.mu.state.LeaseAppliedIndex, reason) } @@ -2457,14 +2496,17 @@ func (r *Replica) refreshPendingCmdsLocked(reason refreshRaftReason, refreshAtDe // the "correct" index). For reproposals, it's generally pretty unlikely // that they can make it in the right place. Reproposing in order is // definitely required, however. + // + // TODO(tschottdorf): evaluate whether `proposals` should be a list/slice. sort.Sort(reproposals) for _, p := range reproposals { - log.Eventf(p.ctx, "reproposing command %x; %s", p.idKey, reason) + log.Eventf(p.ctx, "re-submitting command %x to Raft: %s", p.idKey, reason) if err := r.submitProposalLocked(p); err != nil { - return err + delete(r.mu.proposals, p.idKey) + p.done <- proposalResult{Err: roachpb.NewError(err)} + close(p.done) } } - return nil } func (r *Replica) getReplicaDescriptorByIDLocked( diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 99f463d09834..ed9a106ca2d8 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -5783,7 +5783,11 @@ func TestReplicaIDChangePending(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} - tc.Start(t) + cfg := TestStoreConfig() + // Disable ticks to avoid automatic reproposals after a timeout, which + // would pass this test. + cfg.RaftTickInterval = time.Hour + tc.StartWithStoreConfig(t, cfg) defer tc.Stop() rng := tc.rng @@ -5810,29 +5814,26 @@ func TestReplicaIDChangePending(t *testing.T) { // re-proposed. commandProposed := make(chan struct{}, 1) rng.mu.Lock() - defer rng.mu.Unlock() rng.mu.submitProposalFn = func(p *ProposalData) error { if p.RaftCommand.Cmd.Timestamp.Equal(magicTS) { commandProposed <- struct{}{} } return nil } + rng.mu.Unlock() // Set the ReplicaID on the replica. - if err := rng.setReplicaIDLocked(2); err != nil { + if err := rng.setReplicaID(2); err != nil { t.Fatal(err) } - select { - case <-commandProposed: - default: - t.Fatal("command was not re-proposed") - } + <-commandProposed } func TestReplicaRetryRaftProposal(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.TODO() var tc testContext tc.Start(t) defer tc.Stop() @@ -5858,7 +5859,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { var ba roachpb.BatchRequest ba.Add(&pArg) ba.Timestamp = tc.clock.Now() - if _, pErr := tc.Sender().Send(context.Background(), ba); pErr != nil { + if _, pErr := tc.Sender().Send(ctx, ba); pErr != nil { t.Fatal(pErr) } } @@ -5873,7 +5874,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { wrongLeaseIndex = ai - 1 // used by submitProposalFn above - log.Infof(context.Background(), "test begins") + log.Infof(ctx, "test begins") var ba roachpb.BatchRequest ba.Timestamp = tc.clock.Now() @@ -5882,7 +5883,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { ba.Add(&iArg) { br, pErr, shouldRetry := tc.rng.tryAddWriteCmd( - context.WithValue(context.Background(), magicKey{}, "foo"), + context.WithValue(ctx, magicKey{}, "foo"), ba, ) if !shouldRetry { @@ -5896,7 +5897,7 @@ func TestReplicaRetryRaftProposal(t *testing.T) { atomic.StoreInt32(&c, 0) { br, pErr := tc.rng.addWriteCmd( - context.WithValue(context.Background(), magicKey{}, "foo"), + context.WithValue(ctx, magicKey{}, "foo"), ba, ) if pErr != nil { @@ -5931,14 +5932,13 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { var chs []chan proposalResult func() { - rng.mu.Lock() - defer rng.mu.Unlock() for i := 0; i < num; i++ { var ba roachpb.BatchRequest ba.Timestamp = tc.clock.Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{ Key: roachpb.Key(fmt.Sprintf("k%d", i))}}) - cmd := rng.evaluateProposalLocked(context.Background(), makeIDKey(), repDesc, ba) + cmd := rng.evaluateProposal(context.Background(), makeIDKey(), repDesc, ba) + rng.mu.Lock() rng.insertProposalLocked(cmd) // We actually propose the command only if we don't // cancel it to simulate the case in which Raft loses @@ -5948,11 +5948,12 @@ func TestReplicaCancelRaftCommandProgress(t *testing.T) { log.Infof(context.Background(), "abandoning command %d", i) delete(rng.mu.proposals, cmd.idKey) } else if err := rng.submitProposalLocked(cmd); err != nil { - t.Fatal(err) + t.Error(err) } else { chs = append(chs, cmd.done) } + rng.mu.Unlock() } }() @@ -6010,16 +6011,21 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { ba.Timestamp = tc.clock.Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{ Key: roachpb.Key(fmt.Sprintf("k%d", i))}}) - cmd := tc.rng.evaluateProposalLocked(ctx, makeIDKey(), repDesc, ba) + cmd := tc.rng.evaluateProposal(ctx, makeIDKey(), repDesc, ba) + + tc.rng.mu.Lock() tc.rng.insertProposalLocked(cmd) chs = append(chs, cmd.done) + tc.rng.mu.Unlock() } + tc.rng.mu.Lock() for _, p := range tc.rng.mu.proposals { if v := p.ctx.Value(magicKey{}); v != nil { origIndexes = append(origIndexes, int(p.RaftCommand.MaxLeaseIndex)) } } + tc.rng.mu.Unlock() sort.Ints(origIndexes) @@ -6027,9 +6033,7 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { t.Fatalf("wanted required indexes %v, got %v", expIndexes, origIndexes) } - if err := tc.rng.refreshPendingCmdsLocked(noReason, 0); err != nil { - t.Fatal(err) - } + tc.rng.refreshProposalsLocked(0, reasonTicks) return chs }() for _, ch := range chs { @@ -6042,33 +6046,30 @@ func TestReplicaBurstPendingCommandsAndRepropose(t *testing.T) { t.Fatalf("expected indexes %v, got %v", expIndexes, seenCmds) } - util.SucceedsSoon(t, func() error { - tc.rng.mu.Lock() - defer tc.rng.mu.Unlock() - nonePending := len(tc.rng.mu.proposals) == 0 - c := int(tc.rng.mu.lastAssignedLeaseIndex) - int(tc.rng.mu.state.LeaseAppliedIndex) - if nonePending && c > 0 { - return fmt.Errorf("no pending cmds, but have required index offset %d", c) - } - if nonePending { - return nil - } - return errors.New("still pending commands") - }) + tc.rng.mu.Lock() + defer tc.rng.mu.Unlock() + nonePending := len(tc.rng.mu.proposals) == 0 + c := int(tc.rng.mu.lastAssignedLeaseIndex) - int(tc.rng.mu.state.LeaseAppliedIndex) + if nonePending && c > 0 { + t.Errorf("no pending cmds, but have required index offset %d", c) + } + if !nonePending { + t.Fatalf("still pending commands: %+v", tc.rng.mu.proposals) + } } func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { defer leaktest.AfterTest(t)() var tc testContext - tc.Start(t) + cfg := TestStoreConfig() + cfg.RaftTickInterval = 24 * time.Hour // disable ticks + tc.StartWithStoreConfig(t, cfg) defer tc.Stop() // Grab Replica.raftMu in order to block normal raft replica processing. This // test is ticking the replica manually and doesn't want the store to be // doing so concurrently. r := tc.rng - r.raftMu.Lock() - defer r.raftMu.Unlock() repDesc, err := r.GetReplicaDescriptor() if err != nil { @@ -6086,28 +6087,50 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ticks := r.mu.ticks r.mu.Unlock() for ; (ticks % electionTicks) != 0; ticks++ { - if _, err := r.tickRaftMuLocked(); err != nil { + if _, err := r.tick(); err != nil { t.Fatal(err) } } } + var dropProposals struct { + syncutil.Mutex + m map[*ProposalData]struct{} + } + dropProposals.m = make(map[*ProposalData]struct{}) + + r.mu.Lock() + r.mu.submitProposalFn = func(pd *ProposalData) error { + dropProposals.Lock() + defer dropProposals.Unlock() + if _, ok := dropProposals.m[pd]; !ok { + return defaultSubmitProposalLocked(r, pd) + } + return nil // pretend we proposed though we didn't + } + r.mu.Unlock() + // We tick the replica 2*RaftElectionTimeoutTicks. RaftElectionTimeoutTicks // is special in that it controls how often pending commands are reproposed. for i := 0; i < 2*electionTicks; i++ { // Add another pending command on each iteration. - r.mu.Lock() id := fmt.Sprintf("%08d", i) var ba roachpb.BatchRequest ba.Timestamp = tc.clock.Now() ba.Add(&roachpb.PutRequest{Span: roachpb.Span{Key: roachpb.Key(id)}}) - cmd := r.evaluateProposalLocked(context.Background(), + cmd := r.evaluateProposal(context.Background(), storagebase.CmdIDKey(id), repDesc, ba) + + dropProposals.Lock() + dropProposals.m[cmd] = struct{}{} // silently drop proposals + dropProposals.Unlock() + + r.mu.Lock() r.insertProposalLocked(cmd) if err := r.submitProposalLocked(cmd); err != nil { - t.Fatal(err) + t.Error(err) } - // Build a map from command key to proposed-at-ticks. + // Build the map of expected reproposals at this stage. m := map[storagebase.CmdIDKey]int{} for id, p := range r.mu.proposals { m[id] = p.proposedAtTicks @@ -6115,26 +6138,30 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { r.mu.Unlock() // Tick raft. - if _, err := r.tickRaftMuLocked(); err != nil { + if _, err := r.tick(); err != nil { t.Fatal(err) } - // Gather up the reproposed commands. r.mu.Lock() + ticks := r.mu.ticks + r.mu.Unlock() + var reproposed []*ProposalData - for id, p := range r.mu.proposals { - if m[id] != p.proposedAtTicks { + r.mu.Lock() // avoid data race - proposals belong to the Replica + dropProposals.Lock() + for p := range dropProposals.m { + if p.proposedAtTicks >= ticks { reproposed = append(reproposed, p) } } - ticks := r.mu.ticks + dropProposals.Unlock() r.mu.Unlock() - // Reproposals are only performed every electionTicks. We'll need to fix - // this test if that changes. + // Reproposals are only performed every electionTicks. We'll need + // to fix this test if that changes. if (ticks % electionTicks) == 0 { if len(reproposed) != i-1 { - t.Fatalf("%d: expected %d reproposed commands, but found %+v", i, i-1, reproposed) + t.Fatalf("%d: expected %d reproposed commands, but found %d", i, i-1, len(reproposed)) } } else { if len(reproposed) != 0 {