Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: record metrics for ErrProposalDropped #100083

Merged
merged 15 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,20 @@ or the delegate being too busy to send.
Measurement: "Ticks",
Unit: metric.Unit_COUNT,
}
metaRaftProposalsDropped = metric.Metadata{
Name: "raft.dropped",
Help: "Number of Raft proposals dropped (this counts individial raftpb.Entry, not raftpb.MsgProp)",
Measurement: "Proposals",
Unit: metric.Unit_COUNT,
}
metaRaftProposalsDroppedLeader = metric.Metadata{
Name: "raft.dropped_leader",
Help: "Number of Raft proposals dropped by a Replica that believes itself to be the leader; " +
"each update also increments `raft.dropped` " +
"(this counts individial raftpb.Entry, not raftpb.MsgProp)",
tbg marked this conversation as resolved.
Show resolved Hide resolved
Measurement: "Proposals",
Unit: metric.Unit_COUNT,
}
metaRaftWorkingDurationNanos = metric.Metadata{
Name: "raft.process.workingnanos",
Help: `Nanoseconds spent in store.processRaft() working.
Expand Down Expand Up @@ -2256,20 +2270,22 @@ type StoreMetrics struct {
DelegateSnapshotInProgress *metric.Gauge

// Raft processing metrics.
RaftTicks *metric.Counter
RaftQuotaPoolPercentUsed metric.IHistogram
RaftWorkingDurationNanos *metric.Counter
RaftTickingDurationNanos *metric.Counter
RaftCommandsApplied *metric.Counter
RaftLogCommitLatency metric.IHistogram
RaftCommandCommitLatency metric.IHistogram
RaftHandleReadyLatency metric.IHistogram
RaftApplyCommittedLatency metric.IHistogram
RaftSchedulerLatency metric.IHistogram
RaftTimeoutCampaign *metric.Counter
RaftStorageReadBytes *metric.Counter
WALBytesWritten *metric.Gauge
WALBytesIn *metric.Gauge
RaftTicks *metric.Counter
RaftProposalsDropped *metric.Counter
RaftProposalsDroppedLeader *metric.Counter
RaftQuotaPoolPercentUsed metric.IHistogram
RaftWorkingDurationNanos *metric.Counter
RaftTickingDurationNanos *metric.Counter
RaftCommandsApplied *metric.Counter
RaftLogCommitLatency metric.IHistogram
RaftCommandCommitLatency metric.IHistogram
RaftHandleReadyLatency metric.IHistogram
RaftApplyCommittedLatency metric.IHistogram
RaftSchedulerLatency metric.IHistogram
RaftTimeoutCampaign *metric.Counter
RaftStorageReadBytes *metric.Counter
WALBytesWritten *metric.Gauge
WALBytesIn *metric.Gauge

// Raft message metrics.
//
Expand Down Expand Up @@ -2885,7 +2901,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
DelegateSnapshotInProgress: metric.NewGauge(metaDelegateSnapshotInProgress),

// Raft processing metrics.
RaftTicks: metric.NewCounter(metaRaftTicks),
RaftTicks: metric.NewCounter(metaRaftTicks),
RaftProposalsDropped: metric.NewCounter(metaRaftProposalsDropped),
RaftProposalsDroppedLeader: metric.NewCounter(metaRaftProposalsDroppedLeader),
RaftQuotaPoolPercentUsed: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaRaftQuotaPoolPercentUsed,
Duration: histogramWindow,
Expand Down
101 changes: 67 additions & 34 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,19 @@ type admitEntHandle struct {
pCtx context.Context
}

type singleBatchProposer interface {
getReplicaID() roachpb.ReplicaID
flowControlHandle(ctx context.Context) kvflowcontrol.Handle
onErrProposalDropped([]raftpb.Entry, raft.StateType)
}

// A proposer is an object that uses a propBuf to coordinate Raft proposals.
type proposer interface {
locker() sync.Locker
rlocker() sync.Locker

// The following require the proposer to hold (at least) a shared lock.
getReplicaID() roachpb.ReplicaID
singleBatchProposer
destroyed() destroyStatus
firstIndex() kvpb.RaftIndex
leaseAppliedIndex() kvpb.LeaseAppliedIndex
Expand All @@ -157,7 +163,6 @@ type proposer interface {
leaderStatus(ctx context.Context, raftGroup proposerRaft) rangeLeaderInfo
ownsValidLease(ctx context.Context, now hlc.ClockTimestamp) bool
shouldCampaignOnRedirect(raftGroup proposerRaft) bool
flowControlHandle(ctx context.Context) kvflowcontrol.Handle

// The following require the proposer to hold an exclusive lock.
withGroupLocked(func(proposerRaft) error) error
Expand Down Expand Up @@ -417,13 +422,21 @@ func (b *propBuf) FlushLockedWithRaftGroup(
// at once. Building up batches of entries and proposing them with a single
// Step can dramatically reduce the number of messages required to commit
// and apply them.
buf := b.arr.asSlice()[:used]
ents := make([]raftpb.Entry, 0, used)

ents := make([]raftpb.Entry, 0, used)
// Use this slice to track, for each entry that's proposed to raft, whether
// it's subject to replication admission control. Updated in tandem with
// slice above.
admitHandles := make([]admitEntHandle, 0, used)
// INVARIANT: buf[firstProp:nextProp] lines up with the ents slice.
firstProp, nextProp := 0, 0
buf := b.arr.asSlice()[:used]
defer func() {
// Clear buffer.
for i := range buf {
buf[i] = nil
}
}()

// Compute the closed timestamp target, which will be used to assign a closed
// timestamp to all proposals in this batch.
Expand All @@ -439,7 +452,6 @@ func (b *propBuf) FlushLockedWithRaftGroup(
log.Fatalf(ctx, "unexpected nil proposal in buffer")
return 0, nil // unreachable, for linter
}
buf[i] = nil // clear buffer
reproposal := !p.tok.stillTracked()

// Conditionally reject the proposal based on the state of the raft group.
Expand Down Expand Up @@ -508,16 +520,14 @@ func (b *propBuf) FlushLockedWithRaftGroup(
// Flush any previously batched (non-conf change) proposals to
// preserve the correct ordering or proposals. Later proposals
// will start a new batch.
propErr, dropped := proposeBatch(raftGroup, b.p.getReplicaID(), ents)
propErr := proposeBatch(ctx, b.p, raftGroup, ents, admitHandles, buf[firstProp:nextProp])
if propErr != nil {
firstErr = propErr
continue
}
if !dropped {
b.maybeDeductFlowTokens(ctx, admitHandles, ents)
}

ents = ents[len(ents):]
firstProp, nextProp = i, i
admitHandles = admitHandles[len(admitHandles):]

confChangeCtx := kvserverpb.ConfChangeContext{
Expand Down Expand Up @@ -559,6 +569,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
ents = append(ents, raftpb.Entry{
Data: p.encodedCommand,
})
nextProp++
log.VEvent(p.ctx, 2, "flushing proposal to Raft")

// We don't want deduct flow tokens for reproposed commands, and of
Expand All @@ -578,19 +589,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
return 0, firstErr
}

propErr, dropped := proposeBatch(raftGroup, b.p.getReplicaID(), ents)
if propErr == nil && !dropped {
// Now that we know what raft log position[1] this proposal is to end up
// in, deduct flow tokens for it. This is done without blocking (we've
// already waited for available flow tokens pre-evaluation). The tokens
// will later be returned once we're informed of the entry being
// admitted below raft.
//
// [1]: We're relying on an undocumented side effect of upstream raft
// API where it populates the index and term for the passed in
// slice of entries. See etcd-io/raft#57.
b.maybeDeductFlowTokens(ctx, admitHandles, ents)
}
propErr := proposeBatch(ctx, b.p, raftGroup, ents, admitHandles, buf[firstProp:nextProp])
return used, propErr
}

Expand Down Expand Up @@ -949,29 +948,55 @@ func (b *propBuf) forwardClosedTimestampLocked(closedTS hlc.Timestamp) bool {
}

func proposeBatch(
raftGroup proposerRaft, replID roachpb.ReplicaID, ents []raftpb.Entry,
) (_ error, dropped bool) {
ctx context.Context,
p singleBatchProposer,
raftGroup proposerRaft,
ents []raftpb.Entry,
handles []admitEntHandle,
props []*ProposalData,
) (_ error) {
if len(ents) != len(props) {
return errors.AssertionFailedf("ents and props don't match up: %v and %v", ents, props)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error would contain quite a lot of data, including proposals. How about only reporting len(ents) and len(props)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe - but it's a clear programming error, so I don't want to go crazy and definitely don't want to under-report information, in case the error is rare. I'll leave as is unless you feel strongly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how far up the stack this error can make, whether it can make into text logs or get outside the process. It can contain user data, right? So that's a bit sensitive. Maybe err on the safe side, and include only necessary info in the error message. See a similar panic in maybeDeductFlowTokens.

}
if len(ents) == 0 {
return nil, false
return nil
}
if err := raftGroup.Step(raftpb.Message{
replID := p.getReplicaID()
err := raftGroup.Step(raftpb.Message{
Type: raftpb.MsgProp,
From: uint64(replID),
Entries: ents,
}); errors.Is(err, raft.ErrProposalDropped) {
})
if err != nil && errors.Is(err, raft.ErrProposalDropped) {
// Silently ignore dropped proposals (they were always silently
// ignored prior to the introduction of ErrProposalDropped).
tbg marked this conversation as resolved.
Show resolved Hide resolved
// TODO(bdarnell): Handle ErrProposalDropped better.
// https://github.com/cockroachdb/cockroach/issues/21849
return nil, true
} else if err != nil {
return err, false
for _, p := range props {
if p.ctx != nil {
log.Event(p.ctx, "entry dropped")
}
}
p.onErrProposalDropped(ents, raftGroup.BasicStatus().RaftState)
return nil //nolint:returnerrcheck
}
if err == nil {
// Now that we know what raft log position[1] this proposal is to end up
// in, deduct flow tokens for it. This is done without blocking (we've
// already waited for available flow tokens pre-evaluation). The tokens
// will later be returned once we're informed of the entry being
// admitted below raft.
//
// [1]: We're relying on an undocumented side effect of upstream raft
// API where it populates the index and term for the passed in
// slice of entries. See etcd-io/raft#57.
maybeDeductFlowTokens(ctx, p.flowControlHandle(ctx), handles, ents)
}
return nil, false
return err
}

func (b *propBuf) maybeDeductFlowTokens(
ctx context.Context, admitHandles []admitEntHandle, ents []raftpb.Entry,
func maybeDeductFlowTokens(
ctx context.Context, h kvflowcontrol.Handle, admitHandles []admitEntHandle, ents []raftpb.Entry,
) {
if len(admitHandles) != len(ents) || cap(admitHandles) != cap(ents) {
panic(
Expand All @@ -988,7 +1013,7 @@ func (b *propBuf) maybeDeductFlowTokens(
return "<omitted>"
}),
)
b.p.flowControlHandle(ctx).DeductTokensFor(
h.DeductTokensFor(
admitHandle.pCtx,
admissionpb.WorkPriority(admitHandle.handle.AdmissionPriority),
kvflowcontrolpb.RaftLogPosition{
Expand Down Expand Up @@ -1258,6 +1283,14 @@ func (rp *replicaProposer) withGroupLocked(fn func(raftGroup proposerRaft) error
})
}

func (rp *replicaProposer) onErrProposalDropped(ents []raftpb.Entry, stateType raft.StateType) {
n := int64(len(ents))
rp.store.metrics.RaftProposalsDropped.Inc(n)
if stateType == raft.StateLeader {
rp.store.metrics.RaftProposalsDroppedLeader.Inc(n)
}
}

func (rp *replicaProposer) leaseDebugRLocked() string {
return rp.mu.state.Lease.String()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ func (t *testProposer) withGroupLocked(fn func(proposerRaft) error) error {
return fn(t.raftGroup)
}

func (rp *testProposer) onErrProposalDropped(ents []raftpb.Entry, stateType raft.StateType) {}

func (t *testProposer) leaseDebugRLocked() string {
return ""
}
Expand Down