diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 4c199f2d1d1c..284e2fc8b361 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -571,8 +571,16 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } -// cleanupFailedProposalLocked cleans up after a proposal that has failed. It +// cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. +func (r *Replica) cleanupFailedProposal(p *ProposalData) { + r.mu.Lock() + defer r.mu.Unlock() + r.cleanupFailedProposalLocked(p) +} + +// cleanupFailedProposalLocked is like cleanupFailedProposal, but requires +// the Replica mutex to be exclusively held. func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { // Clear the proposal from the proposals map. May be a no-op if the // proposal has not yet been inserted into the map. diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index 4efcd73a1a12..93a7814af2ae 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -136,7 +136,7 @@ type propBuf struct { testing struct { // leaseIndexFilter can be used by tests to override the max lease index // assigned to a proposal by returning a non-zero lease index. - leaseIndexFilter func(*ProposalData) (indexOverride uint64) + leaseIndexFilter func(*ProposalData) (indexOverride uint64, err error) // submitProposalFilter can be used by tests to observe and optionally // drop Raft proposals before they are handed to etcd/raft to begin the // process of replication. Dropped proposals are still eligible to be @@ -208,7 +208,9 @@ func (b *propBuf) Insert(p *ProposalData, data []byte) (uint64, error) { // Assign the command's maximum lease index. p.command.MaxLeaseIndex = b.liBase + res.leaseIndexOffset() if filter := b.testing.leaseIndexFilter; filter != nil { - if override := filter(p); override != 0 { + if override, err := filter(p); err != nil { + return 0, err + } else if override != 0 { p.command.MaxLeaseIndex = override } } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 25c81a445d53..e55999d4ca7a 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -190,6 +190,14 @@ func (r *Replica) evalAndPropose( if err := r.maybeAcquireProposalQuota(ctx, proposal.quotaSize); err != nil { return nil, nil, 0, roachpb.NewError(err) } + // Make sure we clean up the proposal if we fail to insert it into the + // proposal buffer successfully. This ensures that we always release any + // quota that we acquire. + defer func() { + if pErr != nil { + r.cleanupFailedProposal(proposal) + } + }() if filter := r.store.TestingKnobs().TestingProposalFilter; filter != nil { filterArgs := storagebase.ProposalFilterArgs{ @@ -230,11 +238,13 @@ func (r *Replica) evalAndPropose( return proposalCh, abandon, maxLeaseIndex, nil } -// propose starts tracking a command and proposes it to raft. If -// this method succeeds, the caller is responsible for eventually -// removing the proposal from the pending map (on success, in -// processRaftCommand, or on failure via cleanupFailedProposalLocked). -func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr *roachpb.Error) { +// propose encodes a command, starts tracking it, and proposes it to raft. The +// method is also responsible for assigning the command its maximum lease index. +// +// The method hands ownership of the command over to the Raft machinery. After +// the method returns, all access to the command must be performed while holding +// Replica.mu and Replica.raftMu. +func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) { // Make sure the maximum lease index is unset. This field will be set in // propBuf.Insert and its encoded bytes will be appended to the encoding // buffer as a RaftCommandFooter. @@ -258,7 +268,8 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (_ int64, pErr * // leases can stay in such a state for a very long time when using epoch- // based range leases). This shouldn't happen often, but has been seen // before (#12591). - if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == r.mu.replicaID { + replID := p.command.ProposerReplica.ReplicaID + if crt.ChangeType == roachpb.REMOVE_REPLICA && crt.Replica.ReplicaID == replID { msg := fmt.Sprintf("received invalid ChangeReplicasTrigger %s to remove self (leaseholder)", crt) log.Error(p.ctx, msg) return 0, roachpb.NewErrorf("%s: %s", r, msg) diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 1db9346563e0..99d9c9b9a230 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -6532,6 +6532,49 @@ func TestReplicaDestroy(t *testing.T) { } } +// TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by +// proposals is released back into the quota pool if the proposal fails before +// being submitted to Raft. +func TestQuotaPoolReleasedOnFailedProposal(t *testing.T) { + defer leaktest.AfterTest(t)() + tc := testContext{} + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.Start(t, stopper) + + // Flush a write all the way through the Raft proposal pipeline to ensure + // that the replica becomes the Raft leader and sets up its quota pool. + iArgs := incrementArgs([]byte("a"), 1) + if _, pErr := tc.SendWrapped(&iArgs); pErr != nil { + t.Fatal(pErr) + } + + type magicKey struct{} + var minQuotaSize int64 + propErr := errors.New("proposal error") + + tc.repl.mu.Lock() + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { + if v := p.ctx.Value(magicKey{}); v != nil { + minQuotaSize = tc.repl.mu.proposalQuota.approximateQuota() + p.quotaSize + return 0, propErr + } + return 0, nil + } + tc.repl.mu.Unlock() + + var ba roachpb.BatchRequest + pArg := putArgs(roachpb.Key("a"), make([]byte, 1<<10)) + ba.Add(&pArg) + ctx := context.WithValue(context.Background(), magicKey{}, "foo") + if _, pErr := tc.Sender().Send(ctx, ba); !testutils.IsPError(pErr, propErr.Error()) { + t.Fatalf("expected error %v, found %v", propErr, pErr) + } + if curQuota := tc.repl.QuotaAvailable(); curQuota < minQuotaSize { + t.Fatalf("proposal quota not released: found=%d, want=%d", curQuota, minQuotaSize) + } +} + // TestQuotaPoolAccessOnDestroyedReplica tests the occurrence of #17303 where // following a leader replica getting destroyed, the scheduling of // handleRaftReady twice on the replica would cause a panic when @@ -7280,13 +7323,13 @@ func TestReplicaRetryRaftProposal(t *testing.T) { var wrongLeaseIndex uint64 // populated below tc.repl.mu.Lock() - tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + tc.repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if v := p.ctx.Value(magicKey{}); v != nil { if curAttempt := atomic.AddInt32(&c, 1); curAttempt == 1 { - return wrongLeaseIndex + return wrongLeaseIndex, nil } } - return 0 + return 0, nil } tc.repl.mu.Unlock() @@ -7731,12 +7774,12 @@ func TestReplicaRefreshMultiple(t *testing.T) { t.Fatalf("test requires LeaseAppliedIndex >= 2 at this point, have %d", ai) } assigned := false - repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + repl.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { if p == proposal && !assigned { assigned = true - return ai - 1 + return ai - 1, nil } - return 0 + return 0, nil } repl.mu.Unlock() @@ -7877,16 +7920,16 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { r := tc.repl r.mu.Lock() - r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64) { + r.mu.proposalBuf.testing.leaseIndexFilter = func(p *ProposalData) (indexOverride uint64, _ error) { // We're going to recognize the first time the commnand for the // EndTransaction is proposed and we're going to hackily decrease its // MaxLeaseIndex, so that the processing gets rejected further on. ut := p.Local.UpdatedTxns if atomic.LoadInt64(&proposalRecognized) == 0 && ut != nil && len(*ut) == 1 && (*ut)[0].ID == txn.ID { atomic.StoreInt64(&proposalRecognized, 1) - return p.command.MaxLeaseIndex - 1 + return p.command.MaxLeaseIndex - 1, nil } - return 0 + return 0, nil } r.mu.Unlock()