Skip to content

Commit

Permalink
Merge pull request #118415 from erikgrinaker/backport23.1-117612
Browse files Browse the repository at this point in the history
release-23.1: rangefeed: fix premature checkpoint due to intent resolution race
  • Loading branch information
erikgrinaker authored Feb 1, 2024
2 parents 0778c78 + 2c5fc9d commit 8f3b31e
Show file tree
Hide file tree
Showing 10 changed files with 549 additions and 84 deletions.
5 changes: 5 additions & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ go_test(
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand All @@ -82,14 +84,17 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/future",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/retry",
"//pkg/util/stop",
Expand Down
326 changes: 326 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,34 @@ import (
"context"
"runtime/pprof"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/storageutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -1155,3 +1163,321 @@ func TestRangeFeedStartTimeExclusive(t *testing.T) {
t.Fatal("timed out waiting for event")
}
}

// TestRangeFeedIntentResolutionRace is a regression test for
// https://github.com/cockroachdb/cockroach/issues/104309, i.e. the
// following scenario:
//
// - A rangefeed is running on a lagging follower.
// - A txn writes an intent, which is replicated to the follower.
// - The closed timestamp advances past the intent.
// - The txn commits and resolves the intent at the original write timestamp,
// then GCs its txn record. This is not yet applied on the follower.
// - The rangefeed pushes the txn to advance its resolved timestamp.
// - The txn is GCed, so the push returns ABORTED (it can't know whether the
// txn was committed or aborted after its record is GCed).
// - The rangefeed disregards the "aborted" txn and advances the resolved
// timestamp, emitting a checkpoint.
// - The follower applies the resolved intent and emits an event below
// the checkpoint, violating the checkpoint guarantee.
//
// This scenario is addressed by running a Barrier request through Raft and
// waiting for it to apply locally before removing the txn from resolved ts
// tracking. This ensures the pending intent resolution is applied before
// the resolved ts can advance.
func TestRangeFeedIntentResolutionRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow, times out
skip.UnderDeadlock(t)

// Use a timeout, to prevent a hung test.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
// defer cancel() after Stopper.Stop(), so the context cancels first.
// Otherwise, the stopper will hang waiting for a rangefeed whose context is
// not yet cancelled.

// Set up an apply filter that blocks Raft application on n3 (follower) for
// the given range.
var blockRangeOnN3 atomic.Int64
unblockRangeC := make(chan struct{})
applyFilter := func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
if args.StoreID == 3 {
if rangeID := blockRangeOnN3.Load(); rangeID > 0 && rangeID == int64(args.RangeID) {
t.Logf("blocked r%d on s%d", args.RangeID, args.StoreID)
select {
case <-unblockRangeC:
t.Logf("unblocked r%d on s%d", args.RangeID, args.StoreID)
case <-ctx.Done():
return 0, kvpb.NewError(ctx.Err())
}
}
}
return 0, nil
}

// Set up a request filter that blocks transaction pushes for a specific key.
// This is used to prevent the rangefeed txn pusher from pushing the txn
// timestamp above the closed timestamp before it commits, only allowing the
// push to happen after the transaction has committed and GCed below the
// closed timestamp.
var blockPush atomic.Pointer[roachpb.Key]
unblockPushC := make(chan struct{})
reqFilter := func(ctx context.Context, br *kvpb.BatchRequest) *kvpb.Error {
if br.IsSinglePushTxnRequest() {
req := br.Requests[0].GetPushTxn()
if key := blockPush.Load(); key != nil && req.Key.Equal(*key) {
t.Logf("blocked push for txn %s", req.PusheeTxn)
select {
case <-unblockPushC:
t.Logf("unblocked push for txn %s", req.PusheeTxn)
case <-ctx.Done():
return kvpb.NewError(ctx.Err())
}
}
}
return nil
}

// Speed up the test by reducing various closed/resolved timestamp intervals.
const interval = 100 * time.Millisecond
st := cluster.MakeTestingClusterSettings()
kvserver.RangeFeedRefreshInterval.Override(ctx, &st.SV, interval)
closedts.SideTransportCloseInterval.Override(ctx, &st.SV, interval)
closedts.TargetDuration.Override(ctx, &st.SV, interval)

// Start a cluster with 3 nodes.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: reqFilter,
TestingApplyCalledTwiceFilter: applyFilter,
RangeFeedPushTxnsInterval: interval,
RangeFeedPushTxnsAge: interval,
},
},
},
})
defer tc.Stopper().Stop(ctx)
defer cancel()

n1 := tc.Server(0)
s3 := tc.GetFirstStoreFromServer(t, 2)
clock := n1.Clock()

// Determine the key/value we're going to write.
prefix := append(n1.Codec().TenantPrefix(), keys.ScratchRangeMin...)
key := append(prefix.Clone(), []byte("/foo")...)
value := []byte("value")

// Split off a range and upreplicate it, with leaseholder on n1.
_, _, err := n1.SplitRange(prefix)
require.NoError(t, err)
desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...)
t.Logf("split off range %s", desc)

repl1 := tc.GetFirstStoreFromServer(t, 0).LookupReplica(roachpb.RKey(prefix)) // leaseholder
repl3 := tc.GetFirstStoreFromServer(t, 2).LookupReplica(roachpb.RKey(prefix)) // lagging follower

require.True(t, repl1.OwnsValidLease(ctx, clock.NowAsClockTimestamp()))

// Block pushes of the txn, to ensure it can write at a fixed timestamp.
// Otherwise, the rangefeed or someone else may push it beyond the closed
// timestamp.
blockPush.Store(&key)

// We'll use n3 as our lagging follower. Start a rangefeed on it directly.
req := kvpb.RangeFeedRequest{
Header: kvpb.Header{
RangeID: desc.RangeID,
},
Span: desc.RSpan().AsRawSpanWithNoLocals(),
}
eventC := make(chan *kvpb.RangeFeedEvent)
sink := newChannelSink(ctx, eventC)
fErr := future.MakeAwaitableFuture(s3.RangeFeed(&req, sink))
require.NoError(t, fErr.Get()) // check if we've errored yet
t.Logf("started rangefeed on %s", repl3)

// Spawn a rangefeed monitor, which posts checkpoint updates to checkpointC.
// This uses a buffered channel of size 1, and empties it out before posting a
// new update, such that it contains the latest known checkpoint and does not
// block the rangefeed. It also posts emitted values for our key to valueC,
// which should only happen once.
checkpointC := make(chan *kvpb.RangeFeedCheckpoint, 1)
valueC := make(chan *kvpb.RangeFeedValue, 1)
go func() {
defer close(checkpointC)
defer close(valueC)
for {
select {
case e := <-eventC:
switch {
case e.Checkpoint != nil:
// Clear checkpointC such that it always contains the latest.
select {
case <-checkpointC:
default:
}
checkpointC <- e.Checkpoint
case e.Val != nil && e.Val.Key.Equal(key):
select {
case valueC <- e.Val:
default:
t.Errorf("duplicate value event for %s: %s", key, e)
}
}
case <-ctx.Done():
return
}
}
}()

waitForCheckpoint := func(t *testing.T, ts hlc.Timestamp) hlc.Timestamp {
t.Helper()
timeoutC := time.After(3 * time.Second)
for {
select {
case c := <-checkpointC:
require.NotNil(t, c, "nil checkpoint")
if ts.LessEq(c.ResolvedTS) {
t.Logf("rangefeed checkpoint at %s >= %s", c.ResolvedTS, ts)
return c.ResolvedTS
}
case <-timeoutC:
require.Fail(t, "timed out waiting for checkpoint", "wanted %s", ts)
}
}
}

// Wait for the initial checkpoint.
rts := waitForCheckpoint(t, clock.Now())
t.Logf("initial checkpoint at %s", rts)

// Start a new transaction on n1 with a fixed timestamp (to make sure it
// remains below the closed timestamp). Write an intent, and read it back to
// make sure it has applied.
writeTS := clock.Now()
txn := n1.DB().NewTxn(ctx, "test")
require.NoError(t, txn.SetFixedTimestamp(ctx, writeTS))
require.NoError(t, txn.Put(ctx, key, value))
_, err = txn.Get(ctx, key)
require.NoError(t, err)
t.Logf("wrote %s", key)

// Wait for both the leaseholder and the follower to close the transaction's
// write timestamp.
waitForClosedTimestamp := func(t *testing.T, repl *kvserver.Replica, ts hlc.Timestamp) hlc.Timestamp {
t.Helper()
timeoutC := time.After(3 * time.Second)
for {
if closedTS := repl.GetCurrentClosedTimestamp(ctx); ts.LessEq(closedTS) {
t.Logf("replica %s closed timestamp at %s >= %s", repl, closedTS, ts)
return closedTS
}
select {
case <-time.After(10 * time.Millisecond):
case <-timeoutC:
require.Fail(t, "timeout out waiting for closed timestamp", "wanted %s", ts)
}
}
}
cts := waitForClosedTimestamp(t, repl1, writeTS)
_ = waitForClosedTimestamp(t, repl3, writeTS)
t.Logf("closed timestamp %s is above txn write timestamp %s", cts, writeTS)

// Wait for the rangefeed checkpoint to reach writeTS.Prev(). This ensures the
// rangefeed's view of the closed timestamp has been updated, and is now only
// blocked by the intent.
waitTS := writeTS.Prev()
waitTS.Logical = 0
rts = waitForCheckpoint(t, waitTS)
t.Logf("rangefeed caught up to txn write timestamp at %s", rts)

// Block Raft application on repl3.
blockRangeOnN3.Store(int64(desc.RangeID))

// Commit the transaction, and check its commit timestamp.
require.NoError(t, txn.Commit(ctx))
commitTS := txn.CommitTimestamp()
require.Equal(t, commitTS, writeTS)
t.Logf("txn committed at %s", writeTS)

// Unblock transaction pushes. Since repl3 won't apply the intent resolution
// yet, the rangefeed will keep trying to push the transaction. Once the
// transaction record is GCed (which happens async), the rangefeed will see an
// ABORTED status.
//
// It may see the intermediate COMMITTED state too, but at the time of writing
// that does not matter, since the rangefeed needs to keep tracking the
// intent until it applies the resolution, and it will also see the ABORTED
// status before that happens.
blockPush.Store(nil)
close(unblockPushC)

// Make sure repl3 does not emit a checkpoint beyond the write timestamp. Its
// closed timestamp has already advanced past it, but the unresolved intent
// should prevent the resolved timestamp from advancing, despite the false
// ABORTED state. We also make sure no value has been emitted.
waitC := time.After(3 * time.Second)
for done := false; !done; {
select {
case c := <-checkpointC:
require.NotNil(t, c)
require.False(t, commitTS.LessEq(c.ResolvedTS),
"repl %s emitted checkpoint %s beyond write timestamp %s", repl3, c.ResolvedTS, commitTS)
case v := <-valueC:
require.Fail(t, "repl3 emitted premature value %s", v)
case <-waitC:
done = true
}
}
t.Logf("checkpoint still below write timestamp")

// Unblock application on repl3. Wait for the checkpoint to catch up to the
// commit timestamp, and the committed value to be emitted.
blockRangeOnN3.Store(0)
close(unblockRangeC)

rts = waitForCheckpoint(t, writeTS)
t.Logf("checkpoint %s caught up to write timestamp %s", rts, writeTS)

select {
case v := <-valueC:
require.Equal(t, v.Key, key)
require.Equal(t, v.Value.Timestamp, writeTS)
t.Logf("received value %s", v)
case <-time.After(3 * time.Second):
require.Fail(t, "timed out waiting for event")
}

// The rangefeed should still be running.
require.NoError(t, fErr.Get())
}

// channelSink is a rangefeed sink which posts events to a channel.
type channelSink struct {
ctx context.Context
ch chan<- *kvpb.RangeFeedEvent
}

func newChannelSink(ctx context.Context, ch chan<- *kvpb.RangeFeedEvent) *channelSink {
return &channelSink{ctx: ctx, ch: ch}
}

func (c *channelSink) Context() context.Context {
return c.ctx
}

func (c *channelSink) Send(e *kvpb.RangeFeedEvent) error {
select {
case c.ch <- e:
return nil
case <-c.ctx.Done():
return c.ctx.Err()
}
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/storage/enginepb",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/contextutil",
"//pkg/util/envutil",
"//pkg/util/future",
"//pkg/util/hlc",
Expand Down
Loading

0 comments on commit 8f3b31e

Please sign in to comment.