Skip to content

Commit

Permalink
rangefeedbuffer: introduce a rangefeed buffer
Browse files Browse the repository at this point in the history
In both #69614 and #69661 we find ourselves reaching for a thin,
memory-bounded buffer to sit on top a rangefeed. We want to be able to
accumulate raw rangefeed events, and when the rangefeed frontier is
bumped, flush everything out en-masse in timestamp sorted order. Package
rangefeedbuffer introduces such a datastructure.

If we're past the configured memory limit before having observed a
frontier bump, we have little recourse -- since we receive checkpoints and
rangefeed values on the same stream, we're waiting for a future
checkpoint to flush out the accumulated memory. Given this, we simply
error out to the caller (expecting them to re-establish the rangefeed).

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
  • Loading branch information
irfansharif and arulajmani committed Oct 12, 2021
1 parent 44752c4 commit efc15c5
Show file tree
Hide file tree
Showing 4 changed files with 367 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ ALL_TESTS = [
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_test",
"//pkg/kv/kvclient/rangecache:rangecache_test",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober_test",
Expand Down
47 changes: 47 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "buffer",
srcs = ["buffer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/buffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/util/hlc",
"//pkg/util/log/logcrash",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_library(
name = "rangefeedbuffer",
srcs = ["buffer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "rangefeedbuffer_test",
srcs = ["buffer_test.go"],
deps = [
":rangefeedbuffer",
"//pkg/roachpb:with-mocks",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
157 changes: 157 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeedbuffer

import (
"container/heap"
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ErrBufferLimitExceeded is returned by the buffer when attempting to add more
// events than the limit the buffer is configured with.
var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded")

// Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It
// accumulates raw rangefeed events[1], which can be flushed out in timestamp
// sorted order en-masse whenever the rangefeed frontier is bumped. If we
// accumulate more events than the limit allows for, we error out to the caller.
//
// [1]: Rangefeed error events are propagated to the caller, checkpoint events
// are discarded.
//
// TODO(irfansharif): We could also de-bounce values with the same timestamp,
// instead of letting the caller handle it themselves.
type Buffer struct {
limit int

mu struct {
syncutil.Mutex
entryHeap
frontier hlc.Timestamp
}
}

// New constructs a Buffer with the provided limit.
func New(limit int) *Buffer {
return &Buffer{limit: limit}
}

// Add adds the given rangefeed entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedEvent) error {
if ev.Error != nil {
return ev.Error.Error.GoError()
}

if checkpoint := ev.Checkpoint; checkpoint != nil {
// We don't care about rangefeed checkpoint events; discard.
return nil

// TODO(irfansharif): Should we instead accumulate these events as well,
// with the resolved timestamps?
}

value := ev.Val.Value
if !value.IsPresent() {
value = ev.Val.PrevValue
}

b.mu.Lock()
defer b.mu.Unlock()

if value.Timestamp.LessEq(b.mu.frontier) {
// If the rangefeed entry is at a timestamp less than or equal to our
// last known checkpoint, we don't need to record it.
return nil
}

if b.mu.entryHeap.Len()+1 > b.limit {
return ErrBufferLimitExceeded
}
heap.Push(&b.mu.entryHeap, &entry{RangeFeedEvent: ev, Timestamp: value.Timestamp})

return nil
}

// Flush returns the timestamp sorted list of accumulated events with timestamps
// less than or equal to the one provided (typically the rangefeed frontier
// timestamp, whenever it's advanced). The timestamp is recorded, and future
// events with timestamps less than or equal to it are discarded.
func (b *Buffer) Flush(
ctx context.Context, frontier hlc.Timestamp,
) (events []*roachpb.RangeFeedEvent) {
b.mu.Lock()
defer b.mu.Unlock()

if frontier.Less(b.mu.frontier) {
log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier)
}

// Accumulate all events with timestamps <= the given timestamp.
for {
if len(b.mu.entryHeap) == 0 || !b.mu.entryHeap[0].Timestamp.LessEq(frontier) {
break
}

ev := heap.Pop(&b.mu.entryHeap).(*entry)
events = append(events, ev.RangeFeedEvent)
}

b.mu.frontier = frontier
return events
}

type entry struct {
*roachpb.RangeFeedEvent
hlc.Timestamp
}

// entryHeap is a min heap for rangefeed events and their corresponding
// timestamps.
type entryHeap []*entry

var _ heap.Interface = (*entryHeap)(nil)

// Len is part of heap.Interface.
func (ih *entryHeap) Len() int {
return len(*ih)
}

// Less is part of heap.Interface.
func (ih *entryHeap) Less(i, j int) bool {
return (*ih)[i].Timestamp.Less((*ih)[j].Timestamp)
}

// Swap is part of heap.Interface.
func (ih *entryHeap) Swap(i, j int) {
(*ih)[i], (*ih)[j] = (*ih)[j], (*ih)[i]
}

// Push is part of heap.Interface.
func (ih *entryHeap) Push(x interface{}) {
item := x.(*entry)
*ih = append(*ih, item)
}

// Pop is part of heap.Interface.
func (ih *entryHeap) Pop() interface{} {
old := *ih
n := len(old)
item := old[n-1]
old[n-1] = nil
*ih = old[0 : n-1]
return item
}
162 changes: 162 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeedbuffer_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestBuffer ensures that buffer addition and flushing semantics work as
// expected.
func TestBuffer(t *testing.T) {
defer leaktest.AfterTest(t)()
ts := func(nanos int) hlc.Timestamp {
return hlc.Timestamp{WallTime: int64(nanos)}
}
span := func(start, end string) roachpb.Span {
return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)}
}
key := func(k string) roachpb.Key {
return roachpb.Key(k)
}
value := func(v string, ts hlc.Timestamp) roachpb.Value {
return roachpb.Value{
RawBytes: []byte(v),
Timestamp: ts,
}
}

ctx := context.Background()
const limit = 25
buffer := rangefeedbuffer.New(limit)

{ // Sanity check the newly initialized rangefeed buffer.
events := buffer.Flush(ctx, ts(0))
require.True(t, len(events) == 0)
}

// Check that adding rangefeed error events error out.
require.Error(t, buffer.Add(ctx, MakeRangeFeedError(t, roachpb.Error{})))

{ // Ensure that checkpoint events are discarded.
require.NoError(t, buffer.Add(ctx, MakeRangeFeedCheckpoint(t, span("a", "z"), ts(1))))
events := buffer.Flush(ctx, ts(5))
require.True(t, len(events) == 0)
}

{ // Bumping the frontier timestamp without existing events should behave expectedly.
events := buffer.Flush(ctx, ts(10))
require.True(t, len(events) == 0)
}

{ // Flushing at a timestamp lower than buffered events should return nothing.
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("a"), value("a", ts(13))))) // a@13
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("b"), value("b", ts(11))))) // b@11
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("c"), value("c", ts(15))))) // c@15
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("d"), value("d", ts(12))))) // d@12
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("d"), value("e", ts(18))))) // e@18

events := buffer.Flush(ctx, ts(10))
require.True(t, len(events) == 0)
}

{ // Flushing should capture only entries with timestamps <= the frontier, in sorted order.
require.NoError(t, buffer.Add(ctx, MakeRangeFeedCheckpoint(t, span("a", "z"), ts(14))))
events := buffer.Flush(ctx, ts(14))

require.True(t, len(events) == 3)
require.Equal(t, events[0].Val.Value, value("b", ts(11))) // b@11
require.Equal(t, events[1].Val.Value, value("d", ts(12))) // d@12
require.Equal(t, events[2].Val.Value, value("a", ts(13))) // a@13
}

{ // Incremental advances should only surface the events until the given timestamp.
events := buffer.Flush(ctx, ts(15))

require.True(t, len(events) == 1)
require.Equal(t, events[0].Val.Value, value("c", ts(15))) // c@15
}

{ // Adding events with timestamps <= the last flush are discarded.
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("a"), value("a", ts(13))))) // a@13
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("b"), value("b", ts(11))))) // b@11
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("c"), value("c", ts(15))))) // c@15
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("d"), value("d", ts(12))))) // d@12

events := buffer.Flush(ctx, ts(15))
require.True(t, len(events) == 0)
}

{ // Additional events are flushed out at appropriate points.
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("f"), value("f", ts(19))))) // f@19
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("g"), value("g", ts(21))))) // g@21

events := buffer.Flush(ctx, ts(20))
require.True(t, len(events) == 2)
require.Equal(t, events[0].Val.Value, value("e", ts(18))) // e@18
require.Equal(t, events[1].Val.Value, value("f", ts(19))) // f@19
}

{ // Ensure that a timestamp greater than any previous event flushes everything.
events := buffer.Flush(ctx, ts(100))
require.True(t, len(events) == 1)
require.Equal(t, events[0].Val.Value, value("g", ts(21))) // g@21
}

{ // Sanity check that there are no events left over.
events := buffer.Flush(ctx, ts(100))
require.True(t, len(events) == 0)
}

{ // Ensure that buffer limits are respected.
for i := 0; i < limit; i++ {
require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("x"), value("x", ts(101))))) // x@101
}

err := buffer.Add(ctx, MakeRangeFeedValue(t, key("x"), value("x", ts(101))))
require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded)
}
}

func MakeRangeFeedCheckpoint(
t *testing.T, span roachpb.Span, ts hlc.Timestamp,
) *roachpb.RangeFeedEvent {
return MakeRangeFeedEvent(t, &roachpb.RangeFeedCheckpoint{Span: span, ResolvedTS: ts})
}

func MakeRangeFeedError(t *testing.T, err roachpb.Error) *roachpb.RangeFeedEvent {
return MakeRangeFeedEvent(t, &roachpb.RangeFeedError{Error: err})
}

func MakeRangeFeedValue(t *testing.T, key roachpb.Key, val roachpb.Value) *roachpb.RangeFeedEvent {
return MakeRangeFeedValueWithPrev(t, key, val, roachpb.Value{})
}

func MakeRangeFeedValueWithPrev(
t *testing.T, key roachpb.Key, val, prev roachpb.Value,
) *roachpb.RangeFeedEvent {
return MakeRangeFeedEvent(t, &roachpb.RangeFeedValue{Key: key, Value: val, PrevValue: prev})
}

func MakeRangeFeedEvent(t *testing.T, val interface{}) *roachpb.RangeFeedEvent {
var ev roachpb.RangeFeedEvent
if !ev.SetValue(val) {
t.Fatalf("unable to set value: %v", val)
}
return &ev
}

0 comments on commit efc15c5

Please sign in to comment.