Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
109439: changefeedccl: Emit span resolved event when end time reached  r=miretskiy a=miretskiy

Changefeed supports a mode where the user wants to emit
all events that occurred since some time in the past (`cursor`),
and end the changefeed (`end_time) at the time in the near future.

In this mode, the rangefeed catchup scan starting from `cursor`
position could take some time -- maybe even a lot of time --
and in this case, the very first checkpoint kvfeed will observe
will be after `end_time`.  All of the events, including
checkpoints after `end_time` are skipped, as they should.

However, this meant that no changefeed checkpoint
records could be produced until entire changefeed completes.
This PR ensures that once the `end_time` is reached, we will
emit 1 "resolved event" for that span, so that changefeed
can produce span based checkpoint if needed.

Fixes cockroachdb#108464

Release note: None

110267: roachtest: during c2c/shutdown, shutdown main driver if shutdown executor fails r=stevendanna a=msbutler

During cockroachdb#110166, the c2c/shutdown test fataled while the job shutdown executor was running, yet the test kept running for quite a while because the goroutine that manages the c2c job had not realized the test failed. This patch refactors the c2c/shutdown tests such that when the job shutdown executor detects a failure, it cancels the context used by the goroutine managing the c2c job.

Informs cockroachdb#110166

Release note: none

110329: rangefeed: reuse annotated context in `ScheduledProcessor.process()` r=erikgrinaker a=erikgrinaker

Context construction is expensive enough to show up in CPU profiles. With 20k rangefeeds/node on an idle cluster, this made up 1% of overall CPU usage, or 4% of rangefeed scheduler CPU usage.

Epic: none
Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
4 people committed Sep 11, 2023
4 parents dc0aaba + 526100a + 01db2d8 + 6d136d8 commit 0d073f4
Show file tree
Hide file tree
Showing 16 changed files with 340 additions and 81 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/cdcutils/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -69,7 +68,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error {
func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
setLimits := func(rl *quotapool.RateLimiter, rate, burst float64) {
// set rateBudget to unlimited if rate is 0.
rateBudget := quotapool.Limit(math.MaxInt64)
rateBudget := quotapool.Inf()
if rate > 0 {
rateBudget = quotapool.Limit(rate)
}
Expand Down
71 changes: 45 additions & 26 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randident"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -7142,47 +7143,65 @@ func TestChangefeedEndTimeWithCursor(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
endTimeReached := make(chan struct{})
knobs.FeedKnobs.EndTimeReached = func() bool {
select {
case <-endTimeReached:
return true
default:
return false
}
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, "CREATE TABLE foo (a INT PRIMARY KEY)")
sqlDB.Exec(t, "INSERT INTO foo VALUES (1), (2), (3)")

var tsCursor string
sqlDB.QueryRow(t, "SELECT (cluster_logical_timestamp())").Scan(&tsCursor)
sqlDB.Exec(t, "INSERT INTO foo VALUES (4), (5), (6)")

fakeEndTime := s.Server.Clock().Now().Add(int64(time.Hour), 0).AsOfSystemTime()
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan", tsCursor, fakeEndTime)
defer closeFeed(t, feed)
// Insert 1k rows -- using separate statements to get different MVCC timestamps.
for i := 0; i < 1024; i++ {
sqlDB.Exec(t, "INSERT INTO foo VALUES ($1)", i)
}

assertPayloads(t, feed, []string{
`foo: [4]->{"after": {"a": 4}}`,
`foo: [5]->{"after": {"a": 5}}`,
`foo: [6]->{"after": {"a": 6}}`,
})
close(endTimeReached)
// Split table into multiple ranges to make things more interesting.
sqlDB.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (100), (200), (400), (800)")

knobs := s.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
fooSpan := func() roachpb.Span {
fooDesc := desctestutils.TestingGetPublicTableDescriptor(
s.Server.DB(), s.Codec, "d", "foo")
return fooDesc.PrimaryIndexSpan(s.Codec)
}()

// Capture resolved events emitted during changefeed. We expect
// every range to emit resolved event with end_time timestamp.
frontier, err := span.MakeFrontier(fooSpan)
require.NoError(t, err)
knobs.FilterSpanWithMutation = func(rs *jobspb.ResolvedSpan) (bool, error) {
_, err := frontier.Forward(rs.Span, rs.Timestamp)
return false, err
}

// endTime must be after creation time (5 seconds should be enough
// to reach create changefeed statement and process it).
endTime := s.Server.Clock().Now().AddDuration(5 * time.Second)
feed := feed(t, f, "CREATE CHANGEFEED FOR foo WITH cursor = $1, end_time = $2, no_initial_scan",
tsCursor, eval.TimestampToDecimalDatum(endTime).String())
defer closeFeed(t, feed)

// Don't care much about the values emitted (tested elsewhere) -- all
// we want to make sure is that the feed terminates.
testFeed := feed.(cdctest.EnterpriseTestFeed)
require.NoError(t, testFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusSucceeded
}))

// After changefeed completes, verify we have seen all ranges emit resolved
// event with end_time timestamp. That is: verify frontier.Frontier() is at end_time.
expectedFrontier := endTime.Prev()
testutils.SucceedsWithin(t, func() error {
if expectedFrontier.EqOrdering(frontier.Frontier()) {
return nil
}
return errors.Newf("still waiting for frontier to reach %s, current %s",
expectedFrontier, frontier.Frontier())
}, 5*time.Second)
}

// TODO: Fix sinkless feeds not providing pre-close events if Next is called
// after the feed was closed
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_test(
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func newMemBuffer(

b.qp = allocPool{
AbstractPool: quotapool.New("changefeed", quota, opts...),
sv: sv,
metrics: metrics,
}

Expand Down Expand Up @@ -218,6 +219,7 @@ func (b *blockingBuffer) enqueue(ctx context.Context, e Event) (err error) {
}

b.metrics.BufferEntriesIn.Inc(1)
b.metrics.BufferEntriesByType[e.et.Index()].Inc(1)
b.mu.queue.enqueue(e)

select {
Expand Down Expand Up @@ -246,6 +248,7 @@ func (b *blockingBuffer) AcquireMemory(ctx context.Context, n int64) (alloc Allo
return alloc, err
}
b.metrics.BufferEntriesMemAcquired.Inc(n)
b.metrics.AllocatedMem.Inc(n)
return alloc, nil
}

Expand Down Expand Up @@ -324,6 +327,7 @@ func (b *blockingBuffer) CloseWithReason(ctx context.Context, reason error) erro
quota := r.(*memQuota)
quota.closed = true
quota.acc.Close(ctx)
b.metrics.AllocatedMem.Dec(quota.allocated)
return false
})

Expand Down Expand Up @@ -442,16 +446,22 @@ func (r *memRequest) ShouldWait() bool {
type allocPool struct {
*quotapool.AbstractPool
metrics *Metrics
sv *settings.Values
}

func (ap allocPool) Release(ctx context.Context, bytes, entries int64) {
if bytes < 0 {
logcrash.ReportOrPanic(ctx, ap.sv, "attempt to release negative bytes (%d) into pool", bytes)
}

ap.AbstractPool.Update(func(r quotapool.Resource) (shouldNotify bool) {
quota := r.(*memQuota)
if quota.closed {
return false
}
quota.acc.Shrink(ctx, bytes)
quota.allocated -= bytes
ap.metrics.AllocatedMem.Dec(bytes)
ap.metrics.BufferEntriesMemReleased.Inc(bytes)
ap.metrics.BufferEntriesReleased.Inc(entries)
return true
Expand Down
67 changes: 60 additions & 7 deletions pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -94,9 +97,6 @@ func TestBlockingBuffer(t *testing.T) {
}
st := cluster.MakeTestingClusterSettings()
buf := kvevent.TestingNewMemBuffer(ba, &st.SV, &metrics, notifyWait)
defer func() {
require.NoError(t, buf.CloseWithReason(context.Background(), nil))
}()

producerCtx, stopProducers := context.WithCancel(context.Background())
wg := ctxgroup.WithContext(producerCtx)
Expand All @@ -105,26 +105,79 @@ func TestBlockingBuffer(t *testing.T) {
}()

// Start adding KVs to the buffer until we block.
var numResolvedEvents, numKVEvents int
wg.GoCtx(func(ctx context.Context) error {
rnd, _ := randutil.NewTestRand()
for {
err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0)))
if err != nil {
return err
if rnd.Int()%20 == 0 {
prefix := keys.SystemSQLCodec.TablePrefix(42)
sp := roachpb.Span{Key: prefix, EndKey: prefix.Next()}
if err := buf.Add(ctx, kvevent.NewBackfillResolvedEvent(sp, hlc.Timestamp{}, jobspb.ResolvedSpan_BACKFILL)); err != nil {
return err
}
numResolvedEvents++
} else {
if err := buf.Add(ctx, kvevent.MakeKVEvent(makeRangeFeedEvent(rnd, 256, 0))); err != nil {
return err
}
numKVEvents++
}
}
})

<-waitCh
require.NoError(t, timeutil.RunWithTimeout(
context.Background(), "wait", 10*time.Second, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-waitCh:
return nil
}
}))

// Keep consuming events until we get pushback metrics updated.
var numPopped, numFlush int
for metrics.BufferPushbackNanos.Count() == 0 {
e, err := buf.Get(context.Background())
require.NoError(t, err)
a := e.DetachAlloc()
a.Release(context.Background())
numPopped++
if e.Type() == kvevent.TypeFlush {
numFlush++
}
}

// Allocated memory gauge should be non-zero once we buffer some events.
testutils.SucceedsWithin(t, func() error {
if metrics.AllocatedMem.Value() > 0 {
return nil
}
return errors.New("waiting for allocated mem > 0")
}, 5*time.Second)

stopProducers()
require.ErrorIs(t, wg.Wait(), context.Canceled)

require.EqualValues(t, numKVEvents+numResolvedEvents, metrics.BufferEntriesIn.Count())
require.EqualValues(t, numPopped, metrics.BufferEntriesOut.Count())
require.Greater(t, metrics.BufferEntriesMemReleased.Count(), int64(0))

// Flush events are special in that they are ephemeral event that doesn't get
// counted when releasing (it's 0 entries and 0 byte event).
require.EqualValues(t, numPopped-numFlush, metrics.BufferEntriesReleased.Count())

require.EqualValues(t, numKVEvents, metrics.BufferEntriesByType[kvevent.TypeKV].Count())
require.EqualValues(t, numResolvedEvents, metrics.BufferEntriesByType[kvevent.TypeResolved].Count())

// We might have seen numFlush events, but they are synthetic, and only explicitly enqueued
// flush events are counted.
require.EqualValues(t, 0, metrics.BufferEntriesByType[kvevent.TypeFlush].Count())

// After buffer closed, resources are released, and metrics adjusted to reflect.
require.NoError(t, buf.CloseWithReason(context.Background(), context.Canceled))

require.EqualValues(t, 0, metrics.AllocatedMem.Value())
}

func TestBlockingBufferNotifiesConsumerWhenOutOfMemory(t *testing.T) {
Expand Down
29 changes: 25 additions & 4 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,10 @@ type MemAllocator interface {
type Type uint8

const (
// TypeUnknown indicates the event could not be parsed. Will fail the feed.
TypeUnknown Type = iota

// TypeFlush indicates a request to flush buffered data.
// This request type is emitted by blocking buffer when it's blocked, waiting
// for more memory.
TypeFlush
TypeFlush Type = iota

// TypeKV indicates that the KV, PrevKeyValue, and BackfillTimestamp methods
// on the Event meaningful.
Expand All @@ -98,6 +95,9 @@ const (
// TypeResolved indicates that the Resolved method on the Event will be
// meaningful.
TypeResolved = resolvedNone

// number of event types.
numEventTypes = TypeResolved + 1
)

// Event represents an event emitted by a kvfeed. It is either a KV or a
Expand All @@ -120,6 +120,27 @@ func (e *Event) Type() Type {
}
}

// Index returns numerical/ordinal type index suitable for indexing into arrays.
func (t Type) Index() int {
switch t {
case TypeFlush:
return int(TypeFlush)
case TypeKV:
return int(TypeKV)
case TypeResolved, resolvedBackfill, resolvedRestart, resolvedExit:
return int(TypeResolved)
default:
log.Warningf(context.TODO(),
"returning TypeFlush boundary type for unknown event type %d", t)
return int(TypeFlush)
}
}

// Raw returns the underlying RangeFeedEvent.
func (e *Event) Raw() *kvpb.RangeFeedEvent {
return e.ev
}

// ApproximateSize returns events approximate size in bytes.
func (e *Event) ApproximateSize() int {
if e.et == TypeFlush {
Expand Down
Loading

0 comments on commit 0d073f4

Please sign in to comment.