diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 736c629d4a9e..4544ddf26d8c 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -119,6 +119,7 @@ ALL_TESTS = [ "//pkg/kv/bulk:bulk_test", "//pkg/kv/kvclient/kvcoord:kvcoord_test", "//pkg/kv/kvclient/rangecache:rangecache_test", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test", "//pkg/kv/kvclient/rangefeed:rangefeed_test", "//pkg/kv/kvnemesis:kvnemesis_test", "//pkg/kv/kvprober:kvprober_test", diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel new file mode 100644 index 000000000000..14794c78d15d --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "buffer", + srcs = ["buffer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/buffer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/jobs/jobspb", + "//pkg/roachpb:with-mocks", + "//pkg/settings", + "//pkg/util/hlc", + "//pkg/util/log/logcrash", + "//pkg/util/mon", + "//pkg/util/quotapool", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_library( + name = "rangefeedbuffer", + srcs = ["buffer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb:with-mocks", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "rangefeedbuffer_test", + srcs = ["buffer_test.go"], + deps = [ + ":rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go new file mode 100644 index 000000000000..cdc8f1e9637b --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -0,0 +1,157 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeedbuffer + +import ( + "container/heap" + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// ErrBufferLimitExceeded is returned by the buffer when attempting to add more +// events than the limit the buffer is configured with. +var ErrBufferLimitExceeded = errors.New("rangefeed buffer limit exceeded") + +// Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It +// accumulates raw rangefeed events[1], which can be flushed out in timestamp +// sorted order en-masse whenever the rangefeed frontier is bumped. If we +// accumulate more events than the limit allows for, we error out to the caller. +// +// [1]: Rangefeed error events are propagated to the caller, checkpoint events +// are discarded. +// +// TODO(irfansharif): We could also de-bounce values with the same timestamp, +// instead of letting the caller handle it themselves. +type Buffer struct { + limit int + + mu struct { + syncutil.Mutex + entryHeap + frontier hlc.Timestamp + } +} + +// New constructs a Buffer with the provided limit. +func New(limit int) *Buffer { + return &Buffer{limit: limit} +} + +// Add adds the given rangefeed entry to the buffer. +func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedEvent) error { + if ev.Error != nil { + return ev.Error.Error.GoError() + } + + if checkpoint := ev.Checkpoint; checkpoint != nil { + // We don't care about rangefeed checkpoint events; discard. + return nil + + // TODO(irfansharif): Should we instead accumulate these events as well, + // with the resolved timestamps? + } + + value := ev.Val.Value + if !value.IsPresent() { + value = ev.Val.PrevValue + } + + b.mu.Lock() + defer b.mu.Unlock() + + if value.Timestamp.LessEq(b.mu.frontier) { + // If the rangefeed entry is at a timestamp less than or equal to our + // last known checkpoint, we don't need to record it. + return nil + } + + if b.mu.entryHeap.Len()+1 > b.limit { + return ErrBufferLimitExceeded + } + heap.Push(&b.mu.entryHeap, &entry{RangeFeedEvent: ev, Timestamp: value.Timestamp}) + + return nil +} + +// Flush returns the timestamp sorted list of accumulated events with timestamps +// less than or equal to the one provided (typically the rangefeed frontier +// timestamp, whenever it's advanced). The timestamp is recorded, and future +// events with timestamps less than or equal to it are discarded. +func (b *Buffer) Flush( + ctx context.Context, frontier hlc.Timestamp, +) (events []*roachpb.RangeFeedEvent) { + b.mu.Lock() + defer b.mu.Unlock() + + if frontier.Less(b.mu.frontier) { + log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier) + } + + // Accumulate all events with timestamps <= the given timestamp. + for { + if len(b.mu.entryHeap) == 0 || !b.mu.entryHeap[0].Timestamp.LessEq(frontier) { + break + } + + ev := heap.Pop(&b.mu.entryHeap).(*entry) + events = append(events, ev.RangeFeedEvent) + } + + b.mu.frontier = frontier + return events +} + +type entry struct { + *roachpb.RangeFeedEvent + hlc.Timestamp +} + +// entryHeap is a min heap for rangefeed events and their corresponding +// timestamps. +type entryHeap []*entry + +var _ heap.Interface = (*entryHeap)(nil) + +// Len is part of heap.Interface. +func (ih *entryHeap) Len() int { + return len(*ih) +} + +// Less is part of heap.Interface. +func (ih *entryHeap) Less(i, j int) bool { + return (*ih)[i].Timestamp.Less((*ih)[j].Timestamp) +} + +// Swap is part of heap.Interface. +func (ih *entryHeap) Swap(i, j int) { + (*ih)[i], (*ih)[j] = (*ih)[j], (*ih)[i] +} + +// Push is part of heap.Interface. +func (ih *entryHeap) Push(x interface{}) { + item := x.(*entry) + *ih = append(*ih, item) +} + +// Pop is part of heap.Interface. +func (ih *entryHeap) Pop() interface{} { + old := *ih + n := len(old) + item := old[n-1] + old[n-1] = nil + *ih = old[0 : n-1] + return item +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go new file mode 100644 index 000000000000..e6e011d54d8e --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -0,0 +1,162 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeedbuffer_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestBuffer ensures that buffer addition and flushing semantics work as +// expected. +func TestBuffer(t *testing.T) { + defer leaktest.AfterTest(t)() + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{WallTime: int64(nanos)} + } + span := func(start, end string) roachpb.Span { + return roachpb.Span{Key: roachpb.Key(start), EndKey: roachpb.Key(end)} + } + key := func(k string) roachpb.Key { + return roachpb.Key(k) + } + value := func(v string, ts hlc.Timestamp) roachpb.Value { + return roachpb.Value{ + RawBytes: []byte(v), + Timestamp: ts, + } + } + + ctx := context.Background() + const limit = 25 + buffer := rangefeedbuffer.New(limit) + + { // Sanity check the newly initialized rangefeed buffer. + events := buffer.Flush(ctx, ts(0)) + require.True(t, len(events) == 0) + } + + // Check that adding rangefeed error events error out. + require.Error(t, buffer.Add(ctx, MakeRangeFeedError(t, roachpb.Error{}))) + + { // Ensure that checkpoint events are discarded. + require.NoError(t, buffer.Add(ctx, MakeRangeFeedCheckpoint(t, span("a", "z"), ts(1)))) + events := buffer.Flush(ctx, ts(5)) + require.True(t, len(events) == 0) + } + + { // Bumping the frontier timestamp without existing events should behave expectedly. + events := buffer.Flush(ctx, ts(10)) + require.True(t, len(events) == 0) + } + + { // Flushing at a timestamp lower than buffered events should return nothing. + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("a"), value("a", ts(13))))) // a@13 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("b"), value("b", ts(11))))) // b@11 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("c"), value("c", ts(15))))) // c@15 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("d"), value("d", ts(12))))) // d@12 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("d"), value("e", ts(18))))) // e@18 + + events := buffer.Flush(ctx, ts(10)) + require.True(t, len(events) == 0) + } + + { // Flushing should capture only entries with timestamps <= the frontier, in sorted order. + require.NoError(t, buffer.Add(ctx, MakeRangeFeedCheckpoint(t, span("a", "z"), ts(14)))) + events := buffer.Flush(ctx, ts(14)) + + require.True(t, len(events) == 3) + require.Equal(t, events[0].Val.Value, value("b", ts(11))) // b@11 + require.Equal(t, events[1].Val.Value, value("d", ts(12))) // d@12 + require.Equal(t, events[2].Val.Value, value("a", ts(13))) // a@13 + } + + { // Incremental advances should only surface the events until the given timestamp. + events := buffer.Flush(ctx, ts(15)) + + require.True(t, len(events) == 1) + require.Equal(t, events[0].Val.Value, value("c", ts(15))) // c@15 + } + + { // Adding events with timestamps <= the last flush are discarded. + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("a"), value("a", ts(13))))) // a@13 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("b"), value("b", ts(11))))) // b@11 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("c"), value("c", ts(15))))) // c@15 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("d"), value("d", ts(12))))) // d@12 + + events := buffer.Flush(ctx, ts(15)) + require.True(t, len(events) == 0) + } + + { // Additional events are flushed out at appropriate points. + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("f"), value("f", ts(19))))) // f@19 + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("g"), value("g", ts(21))))) // g@21 + + events := buffer.Flush(ctx, ts(20)) + require.True(t, len(events) == 2) + require.Equal(t, events[0].Val.Value, value("e", ts(18))) // e@18 + require.Equal(t, events[1].Val.Value, value("f", ts(19))) // f@19 + } + + { // Ensure that a timestamp greater than any previous event flushes everything. + events := buffer.Flush(ctx, ts(100)) + require.True(t, len(events) == 1) + require.Equal(t, events[0].Val.Value, value("g", ts(21))) // g@21 + } + + { // Sanity check that there are no events left over. + events := buffer.Flush(ctx, ts(100)) + require.True(t, len(events) == 0) + } + + { // Ensure that buffer limits are respected. + for i := 0; i < limit; i++ { + require.NoError(t, buffer.Add(ctx, MakeRangeFeedValue(t, key("x"), value("x", ts(101))))) // x@101 + } + + err := buffer.Add(ctx, MakeRangeFeedValue(t, key("x"), value("x", ts(101)))) + require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded) + } +} + +func MakeRangeFeedCheckpoint( + t *testing.T, span roachpb.Span, ts hlc.Timestamp, +) *roachpb.RangeFeedEvent { + return MakeRangeFeedEvent(t, &roachpb.RangeFeedCheckpoint{Span: span, ResolvedTS: ts}) +} + +func MakeRangeFeedError(t *testing.T, err roachpb.Error) *roachpb.RangeFeedEvent { + return MakeRangeFeedEvent(t, &roachpb.RangeFeedError{Error: err}) +} + +func MakeRangeFeedValue(t *testing.T, key roachpb.Key, val roachpb.Value) *roachpb.RangeFeedEvent { + return MakeRangeFeedValueWithPrev(t, key, val, roachpb.Value{}) +} + +func MakeRangeFeedValueWithPrev( + t *testing.T, key roachpb.Key, val, prev roachpb.Value, +) *roachpb.RangeFeedEvent { + return MakeRangeFeedEvent(t, &roachpb.RangeFeedValue{Key: key, Value: val, PrevValue: prev}) +} + +func MakeRangeFeedEvent(t *testing.T, val interface{}) *roachpb.RangeFeedEvent { + var ev roachpb.RangeFeedEvent + if !ev.SetValue(val) { + t.Fatalf("unable to set value: %v", val) + } + return &ev +}