Skip to content

Commit

Permalink
rangefeed: add non-fatal assertion for commit intent resolve ts
Browse files Browse the repository at this point in the history
This patch adds a non-fatal assertion for commit intent operations that
are emitted below the resolved timestamp. A known bug will do just this,
so the assertion is kept non-fatal to avoid disruption, but it will
confirm whether a cluster is hitting the bug.

The log messages are of the form:

```
E240123 10:49:00.052256 57 kv/kvserver/rangefeed/resolved_timestamp.go:162  [n1,s1,r7/1:‹/Table/{3-4}›,rangefeed] 1  resolved timestamp 0.000000024,0 equal to or above timestamp of operation txn_id:a55c0992-581b-4cb2-97bf-fd9e22f7a657 key:"foo" timestamp:<wall_time:45 > value:"bar"
```

Epic: none
Release note: None
  • Loading branch information
erikgrinaker committed Jan 23, 2024
1 parent 0ad77cf commit 4ddc7ae
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 51 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (p *Processor) consumeLogicalOps(

// Determine whether the operation caused the resolved timestamp to
// move forward. If so, publish a RangeFeedCheckpoint notification.
if p.rts.ConsumeLogicalOp(op) {
if p.rts.ConsumeLogicalOp(ctx, op) {
p.publishCheckpoint(ctx)
}
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/kv/kvserver/rangefeed/resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ package rangefeed
import (
"bytes"
"container/heap"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -128,15 +130,15 @@ func (rts *resolvedTimestamp) ForwardClosedTS(newClosedTS hlc.Timestamp) bool {
// operation within its range of tracked keys. This allows the structure to
// update its internal intent tracking to reflect the change. The method returns
// whether this caused the resolved timestamp to move forward.
func (rts *resolvedTimestamp) ConsumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
if rts.consumeLogicalOp(op) {
func (rts *resolvedTimestamp) ConsumeLogicalOp(ctx context.Context, op enginepb.MVCCLogicalOp) bool {
if rts.consumeLogicalOp(ctx, op) {
return rts.recompute()
}
rts.assertNoChange()
return false
}

func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
func (rts *resolvedTimestamp) consumeLogicalOp(ctx context.Context, op enginepb.MVCCLogicalOp) bool {
switch t := op.GetValue().(type) {
case *enginepb.MVCCWriteValueOp:
rts.assertOpAboveRTS(op, t.Timestamp)
Expand All @@ -154,6 +156,12 @@ func (rts *resolvedTimestamp) consumeLogicalOp(op enginepb.MVCCLogicalOp) bool {
return rts.intentQ.UpdateTS(t.TxnID, t.Timestamp)

case *enginepb.MVCCCommitIntentOp:
// A known bug will trip this assertion, so don't fatal to avoid disruption.
// See: https://github.com/cockroachdb/cockroach/issues/104309
if t.Timestamp.LessEq(rts.resolvedTS) {
log.Errorf(ctx, "resolved timestamp %s equal to or above timestamp of operation %v",
rts.resolvedTS, t)
}
return rts.intentQ.DecrRef(t.TxnID, t.Timestamp)

case *enginepb.MVCCAbortIntentOp:
Expand Down
Loading

0 comments on commit 4ddc7ae

Please sign in to comment.