Skip to content

Commit

Permalink
storage: ensure that spans of reproposals of applied commands are not…
Browse files Browse the repository at this point in the history
… used

This commit deals with the case where more than one reproposal of a command
exists at the same MaxAppliedIndex. Code that already exist prevents the
command from being finished more than once but leaves open the possibility of
writing events into a potentially closed span.

Release note: None
  • Loading branch information
ajwerner authored and nvanbenschoten committed Aug 1, 2019
1 parent 61651a2 commit 5bd37e8
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pkg/storage/replica_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,11 @@ func (r *Replica) applyCmdAppBatch(
}
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
cmd := it.cur()
// Reset the context for already applied commands to ensure that
// reproposals at the same MaxLeaseIndex do not record into closed spans.
if cmd.proposedLocally() && cmd.proposal.applied {
cmd.ctx = ctx
}
for _, sc := range cmd.replicatedResult().SuggestedCompactions {
r.store.compactor.Suggest(cmd.ctx, sc)
}
Expand Down
103 changes: 103 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/proto"
"github.com/kr/pretty"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -11718,6 +11719,108 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
}
}

// TestLaterReproposalsDoNotReuseContext ensures that when commands are
// reproposed more than once at the same MaxLeaseIndex and the first command
// applies that the later reproposals do not log into the proposal's context
// as its underlying trace span may already be finished.
func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
defer leaktest.AfterTest(t)()

// Set the trace infrastructure to log if a span is used after being finished.
defer enableTraceDebugUseAfterFree()()

tc := testContext{}
ctx := context.Background()

// Set logging up to a test specific directory.
scope := log.Scope(t)
defer scope.Close(t)

stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cfg := TestStoreConfig(hlc.NewClock(hlc.UnixNano, time.Nanosecond))
// Set up tracing.
tracer := tracing.NewTracer()
tracer.Configure(&cfg.Settings.SV)
tracer.AlwaysTrace()
cfg.AmbientCtx.Tracer = tracer
tc.StartWithStoreConfig(t, stopper, cfg)
key := roachpb.Key("a")
lease, _ := tc.repl.GetLease()
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
ba := roachpb.BatchRequest{
Header: roachpb.Header{
RangeID: tc.repl.RangeID,
Txn: txn,
},
}
ba.Timestamp = txn.OrigTimestamp
ba.Add(&roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
Value: roachpb.MakeValueFromBytes([]byte("val")),
})

// Hold the RaftLock to encourage the reproposals to occur in the same batch.
tc.repl.RaftLock()
sp := tracer.StartRootSpan("replica send", logtags.FromContext(ctx), tracing.RecordableSpan)
tracedCtx := opentracing.ContextWithSpan(ctx, sp)
// Go out of our way to enable recording so that expensive logging is enabled
// for this context.
tracing.StartRecording(sp, tracing.SingleNodeRecording)
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, lease, &ba, &allSpans, endCmds{})
if pErr != nil {
t.Fatal(pErr)
}
// Launch a goroutine to finish the span as soon as a result has been sent.
errCh := make(chan *roachpb.Error)
go func() {
res := <-ch
sp.Finish()
errCh <- res.Err
}()

// Flush the proposal and then repropose it twice.
// This test verifies that these later reproposals don't record into the
// tracedCtx after its span has been finished.
func() {
tc.repl.mu.Lock()
defer tc.repl.mu.Unlock()
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
t.Fatal(err)
}
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
}()
tc.repl.RaftUnlock()

if pErr = <-errCh; pErr != nil {
t.Fatal(pErr)
}
// Round trip another proposal through the replica to ensure that previously
// committed entries have been applied.
_, pErr = tc.repl.sendWithRangeID(ctx, tc.repl.RangeID, ba)
if pErr != nil {
t.Fatal(pErr)
}

stopper.Quiesce(ctx)
// Check and see if the trace package logged an error.
log.Flush()
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
regexp.MustCompile("net/trace"))
if err != nil {
t.Fatal(err)
}
if len(entries) > 0 {
t.Fatalf("reused span after free: %v", entries)
}
}

func enableTraceDebugUseAfterFree() (restore func()) {
prev := trace.DebugUseAfterFinish
trace.DebugUseAfterFinish = true
Expand Down

0 comments on commit 5bd37e8

Please sign in to comment.