Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
storage: ensure that the last reproposal finishes a command
Browse files Browse the repository at this point in the history
This commit ensure that when a command has been reproposed more than once that
the a reproposal with the highest MaxLeaseIndex ultimately finishes the
command. This commit and associate test eliminate panics we've seen due to
later reproposals recording events into spans which have already been finished.

The next commit will deal preventing recording into a context from a later
reproposal at the same MaxLeaseIndex.

Release note (bug fix): prevent panic due to recording into finished tracing
spans caused by acknowledging an earlier failed reproposal when a later
reproposal exists.
ajwerner authored and nvanbenschoten committed Aug 1, 2019

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent b6930a6 commit 61651a2
Showing 4 changed files with 192 additions and 12 deletions.
21 changes: 11 additions & 10 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
@@ -164,6 +164,16 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) {
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
cmd.proposal = r.mu.proposals[cmd.idKey]
if cmd.proposedLocally() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't consider this
// entry to have been proposed locally. The entry must necessarily be
// rejected by checkForcedErr.
cmd.proposal = nil
}
if cmd.proposedLocally() {
// We initiated this command, so use the caller-supplied context.
cmd.ctx = cmd.proposal.ctx
@@ -179,16 +189,7 @@ func (r *Replica) retrieveLocalProposals(ctx context.Context, b *cmdAppBatch) {
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
toRelease := int64(0)
shouldRemove := cmd.proposedLocally() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
if cmd.proposedLocally() {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
3 changes: 3 additions & 0 deletions pkg/storage/replica_application_result.go
Original file line number Diff line number Diff line change
@@ -457,6 +457,9 @@ func (r *Replica) finishRaftCommand(ctx context.Context, cmd *cmdAppCtx) {
}

if cmd.proposedLocally() {
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
cmd.proposal.finishApplication(cmd.response)
} else if cmd.response.Err != nil {
log.VEventf(ctx, 1, "applying raft command resulted in error: %s", cmd.response.Err)
16 changes: 14 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
@@ -258,8 +258,20 @@ func (r *Replica) evalAndPropose(
//
// The method hands ownership of the command over to the Raft machinery. After
// the method returns, all access to the command must be performed while holding
// Replica.mu and Replica.raftMu.
func (r *Replica) propose(ctx context.Context, p *ProposalData) (int64, *roachpb.Error) {
// Replica.mu and Replica.raftMu. If a non-nil error is returned the
// MaxLeaseIndex is not updated.
func (r *Replica) propose(ctx context.Context, p *ProposalData) (index int64, pErr *roachpb.Error) {

// If an error occurs reset the command's MaxLeaseIndex to its initial value.
// Failure to propose will propagate to the client. An invariant of this
// package is that proposals which are finished carry a raft command with a
// MaxLeaseIndex equal to the proposal command's max lease index.
defer func(prev uint64) {
if pErr != nil {
p.command.MaxLeaseIndex = prev
}
}(p.command.MaxLeaseIndex)

// Make sure the maximum lease index is unset. This field will be set in
// propBuf.Insert and its encoded bytes will be appended to the encoding
// buffer as a RaftCommandFooter.
164 changes: 164 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
@@ -64,6 +64,7 @@ import (
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/raft/tracker"
"golang.org/x/net/trace"
)

// allSpans is a SpanSet that covers *everything* for use in tests that don't
@@ -11559,3 +11560,166 @@ func TestSplitSnapshotWarningStr(t *testing.T) {
splitSnapshotWarningStr(12, status),
)
}

// TestHighestMaxLeaseIndexReproposalFinishesCommand exercises a case where a
// command is reproposed twice at different MaxLeaseIndex values to ultimately
// fail with an error which cannot be reproposed (say due to a lease transfer
// or change to the gc threshold). This test works to exercise the invariant
// that when a proposal has been reproposed at different MaxLeaseIndex value
// the client is ultimately acknowledged with an error from a reproposal with
// the largest index. The test verfies this condition by asserting that the
// span used to trace the execution of the proposal is not used after the
// proposal has been finished.
func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set the trace infrastructure to log if a span is used after being finished.
defer enableTraceDebugUseAfterFree()()

// Set logging up to a test specific directory.
scope := log.Scope(t)
defer scope.Close(t)

tc := testContext{}
ctx := context.Background()

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.manualClock = hlc.NewManualClock(123)
cfg := TestStoreConfig(hlc.NewClock(tc.manualClock.UnixNano, time.Nanosecond))
// Set up tracing.
tracer := tracing.NewTracer()
tracer.Configure(&cfg.Settings.SV)
cfg.AmbientCtx.Tracer = tracer

// Below we set txnID to the value of the transaction we're going to force to
// be proposed multiple times.
var txnID uuid.UUID
// In the TestingProposalFilter we populater cmdID with the id of the proposal
// which corresponds to txnID.
var cmdID storagebase.CmdIDKey
// After we evalAndPropose the command we populate prop with the ProposalData
// value to enable reproposing the same command more than once.
var prop *ProposalData
// seen is used to detect the first application of our proposal.
var seen bool
cfg.TestingKnobs = StoreTestingKnobs{
MaxApplicationBatchSize: 1,
// Set the TestingProposalFilter in order to know the CmdIDKey for our
// request by detecting its txnID.
TestingProposalFilter: func(args storagebase.ProposalFilterArgs) *roachpb.Error {
if args.Req.Header.Txn != nil && args.Req.Header.Txn.ID == txnID {
cmdID = args.CmdID
}
return nil
},
// Detect the application of the proposal to repropose it and also
// invalidate the lease.
TestingApplyFilter: func(args storagebase.ApplyFilterArgs) (retry int, pErr *roachpb.Error) {
if seen || args.CmdID != cmdID {
return 0, nil
}
seen = true
// Repropose on a separate location to not mess with the
// goldenProtosBelowRaft checks.
reproposed := make(chan struct{})
go func() {
if _, pErr := tc.repl.propose(prop.ctx, prop); pErr != nil {
panic(pErr)
}
close(reproposed)
}()
<-reproposed
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
// Flush the proposalBuf to ensure that the reproposal makes it into the
// Replica's proposal map.
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
panic(err)
}
// Increase the lease sequence so that future reproposals will fail with
// NotLeaseHolderError. This mimics the outcome of a leaseholder change
// slipping in between the application of the first proposal and the
// reproposals.
tc.repl.mu.state.Lease.Sequence++
// This return value will force another retry which will carry a yet
// higher MaxLeaseIndex and will trigger our invariant violation.
return int(proposalIllegalLeaseIndex),
roachpb.NewErrorf("forced error that can be reproposed at a higher index")
},
}
tc.StartWithStoreConfig(t, stopper, cfg)
key := roachpb.Key("a")
lease, _ := tc.repl.GetLease()
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
txnID = txn.ID
ba := roachpb.BatchRequest{
Header: roachpb.Header{
RangeID: tc.repl.RangeID,
Txn: txn,
},
}
ba.Timestamp = txn.OrigTimestamp
ba.Add(&roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
Value: roachpb.MakeValueFromBytes([]byte("val")),
})

// Hold the RaftLock to ensure that after evalAndPropose our proposal is in
// the proposal map. Entries are only removed from that map underneath raft.
// We want to grab the proposal so that we can shove in an extra reproposal
// while the first proposal is being applied.
tc.repl.RaftLock()
tracedCtx, cleanup := tracing.EnsureContext(ctx, cfg.AmbientCtx.Tracer, "replica send")
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, lease, &ba, &allSpans, endCmds{})
if pErr != nil {
t.Fatal(pErr)
}
errCh := make(chan *roachpb.Error)
go func() {
res := <-ch
cleanup()
errCh <- res.Err
}()

func() {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
prop = tc.repl.mu.proposals[cmdID]
}()
tc.repl.RaftUnlock()

if pErr = <-errCh; !testutils.IsPError(pErr, "NotLeaseHolder") {
t.Fatal(pErr)
}

// Round trip another proposal through the replica to ensure that previously
// committed entries have been applied.
_, pErr = tc.repl.sendWithRangeID(ctx, tc.repl.RangeID, ba)
if pErr != nil {
t.Fatal(pErr)
}
log.Flush()

stopper.Quiesce(ctx)
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("net/trace"))
if err != nil {
t.Fatal(err)
}
if len(entries) > 0 {
t.Fatalf("reused span after free: %v", entries)
}
}

func enableTraceDebugUseAfterFree() (restore func()) {
prev := trace.DebugUseAfterFinish
trace.DebugUseAfterFinish = true
return func() { trace.DebugUseAfterFinish = prev }
}

0 comments on commit 61651a2

Please sign in to comment.