From 187cab2fa48928dec7656637bba3397b7ff869f3 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Wed, 6 Oct 2021 12:04:35 -0400 Subject: [PATCH 1/2] rangefeedbuffer: introduce a rangefeed buffer In both #69614 and #69661 we find ourselves reaching for a thin, memory-bounded buffer to sit on top a rangefeed. We want to be able to accumulate raw rangefeed events, and when the rangefeed frontier is bumped, flush everything out en-masse in timestamp sorted order. Package rangefeedbuffer introduces such a datastructure. If we're past the configured memory limit before having observed a frontier bump, we have little recourse -- since we receive checkpoints and rangefeed values on the same stream, we're waiting for a future checkpoint to flush out the accumulated memory. Given this, we simply error out to the caller (expecting them to re-establish the rangefeed). Release note: None Co-authored-by: Arul Ajmani --- pkg/BUILD.bazel | 1 + .../rangefeed/rangefeed_external_test.go | 112 +++++++++++++++ .../rangefeed/rangefeedbuffer/BUILD.bazel | 45 ++++++ .../rangefeed/rangefeedbuffer/buffer.go | 102 ++++++++++++++ .../rangefeed/rangefeedbuffer/buffer_test.go | 131 ++++++++++++++++++ 5 files changed, 391 insertions(+) create mode 100644 pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel create mode 100644 pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go create mode 100644 pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 40dcdd4da6e7..296c80cb4d4d 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -119,6 +119,7 @@ ALL_TESTS = [ "//pkg/kv/bulk:bulk_test", "//pkg/kv/kvclient/kvcoord:kvcoord_test", "//pkg/kv/kvclient/rangecache:rangecache_test", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test", "//pkg/kv/kvclient/rangefeed:rangefeed_test", "//pkg/kv/kvnemesis:kvnemesis_test", "//pkg/kv/kvprober:kvprober_test", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index 4a2dd5f7b987..e31ed8ad8465 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -333,3 +333,115 @@ func TestWithOnCheckpoint(t *testing.T) { wg.Wait() } + +// TestRangefeedValueTimestamps tests that the rangefeed values (and previous +// values) have the kind of timestamps we expect when writing, overwriting, and +// deleting keys. +func TestRangefeedValueTimestamps(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + + db := tc.Server(0).DB() + scratchKey := tc.ScratchRange(t) + scratchKey = scratchKey[:len(scratchKey):len(scratchKey)] + mkKey := func(k string) roachpb.Key { + return encoding.EncodeStringAscending(scratchKey, k) + } + + sp := roachpb.Span{ + Key: scratchKey, + EndKey: scratchKey.PrefixEnd(), + } + { + // Enable rangefeeds, otherwise the thing will retry until they are enabled. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true") + require.NoError(t, err) + } + { + // Lower the closed timestamp target duration to speed up the test. + _, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") + require.NoError(t, err) + } + + f, err := rangefeed.NewFactory(tc.Stopper(), db, nil) + require.NoError(t, err) + + rows := make(chan *roachpb.RangeFeedValue) + r, err := f.RangeFeed(ctx, "test", sp, db.Clock().Now(), + func(ctx context.Context, value *roachpb.RangeFeedValue) { + select { + case rows <- value: + case <-ctx.Done(): + } + }, + rangefeed.WithDiff(), + ) + require.NoError(t, err) + defer r.Close() + + mustGetInt := func(value roachpb.Value) int { + val, err := value.GetInt() + require.NoError(t, err) + return int(val) + } + + { + beforeWriteTS := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("a"), 1)) + afterWriteTS := db.Clock().Now() + + v := <-rows + require.Equal(t, mustGetInt(v.Value), 1) + require.True(t, beforeWriteTS.Less(v.Value.Timestamp)) + require.True(t, v.Value.Timestamp.Less(afterWriteTS)) + + require.False(t, v.PrevValue.IsPresent()) + } + + { + beforeOverwriteTS := db.Clock().Now() + require.NoError(t, db.Put(ctx, mkKey("a"), 2)) + afterOverwriteTS := db.Clock().Now() + + v := <-rows + require.Equal(t, mustGetInt(v.Value), 2) + require.True(t, beforeOverwriteTS.Less(v.Value.Timestamp)) + require.True(t, v.Value.Timestamp.Less(afterOverwriteTS)) + + require.True(t, v.PrevValue.IsPresent()) + require.Equal(t, mustGetInt(v.PrevValue), 1) + require.True(t, v.PrevValue.Timestamp.IsEmpty()) + } + + { + beforeDelTS := db.Clock().Now() + require.NoError(t, db.Del(ctx, mkKey("a"))) + afterDelTS := db.Clock().Now() + + v := <-rows + require.False(t, v.Value.IsPresent()) + require.True(t, beforeDelTS.Less(v.Value.Timestamp)) + require.True(t, v.Value.Timestamp.Less(afterDelTS)) + + require.True(t, v.PrevValue.IsPresent()) + require.Equal(t, mustGetInt(v.PrevValue), 2) + require.True(t, v.PrevValue.Timestamp.IsEmpty()) + } + + { + beforeDelTS := db.Clock().Now() + require.NoError(t, db.Del(ctx, mkKey("a"))) + afterDelTS := db.Clock().Now() + + v := <-rows + require.False(t, v.Value.IsPresent()) + require.True(t, beforeDelTS.Less(v.Value.Timestamp)) + require.True(t, v.Value.Timestamp.Less(afterDelTS)) + + require.False(t, v.PrevValue.IsPresent()) + require.True(t, v.PrevValue.Timestamp.IsEmpty()) + } +} diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel new file mode 100644 index 000000000000..17b3432a9035 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel @@ -0,0 +1,45 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "buffer", + srcs = ["buffer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/buffer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/jobs/jobspb", + "//pkg/roachpb:with-mocks", + "//pkg/settings", + "//pkg/util/hlc", + "//pkg/util/log/logcrash", + "//pkg/util/mon", + "//pkg/util/quotapool", + "//pkg/util/syncutil", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_library( + name = "rangefeedbuffer", + srcs = ["buffer.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "rangefeedbuffer_test", + srcs = ["buffer_test.go"], + deps = [ + ":rangefeedbuffer", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go new file mode 100644 index 000000000000..bc54e4b9d2b0 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -0,0 +1,102 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeedbuffer + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// ErrBufferLimitExceeded is returned by the buffer when attempting to add more +// events than the limit the buffer is configured with. +var ErrBufferLimitExceeded = errors.New("buffer limit exceeded") + +// Event is the unit of what can be added to the buffer. +type Event interface { + Timestamp() hlc.Timestamp +} + +// Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It +// accumulates raw events which can then be flushed out in timestamp sorted +// order en-masse whenever the rangefeed frontier is bumped. If we accumulate +// more events than the limit allows for, we error out to the caller. +type Buffer struct { + limit int + + mu struct { + syncutil.Mutex + + events + frontier hlc.Timestamp + } +} + +// New constructs a Buffer with the provided limit. +func New(limit int) *Buffer { + return &Buffer{limit: limit} +} + +// Add adds the given entry to the buffer. +func (b *Buffer) Add(ctx context.Context, ev Event) error { + b.mu.Lock() + defer b.mu.Unlock() + + if ev.Timestamp().LessEq(b.mu.frontier) { + // If the entry is at a timestamp less than or equal to our last known + // frontier, we can discard it. + return nil + } + + if b.mu.events.Len()+1 > b.limit { + return ErrBufferLimitExceeded + } + + b.mu.events = append(b.mu.events, ev) + return nil +} + +// Flush returns the timestamp sorted list of accumulated events with timestamps +// less than or equal to the provided frontier timestamp. The timestamp is +// recorded (expected to monotonically increase), and future events with +// timestamps less than or equal to it are discarded. +func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) { + b.mu.Lock() + defer b.mu.Unlock() + + if frontier.Less(b.mu.frontier) { + log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier) + } + + // Accumulate all events with timestamps <= the given timestamp in sorted + // order. + sort.Sort(&b.mu.events) + idx := sort.Search(len(b.mu.events), func(i int) bool { + return !b.mu.events[i].Timestamp().LessEq(frontier) + }) + + events = b.mu.events[:idx] + b.mu.events = b.mu.events[idx:] + b.mu.frontier = frontier + return events +} + +type events []Event + +var _ sort.Interface = (*events)(nil) + +func (es *events) Len() int { return len(*es) } +func (es *events) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) } +func (es *events) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go new file mode 100644 index 000000000000..f729a96705e3 --- /dev/null +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -0,0 +1,131 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeedbuffer_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestBuffer ensures that buffer addition and flushing semantics work as +// expected. +func TestBuffer(t *testing.T) { + defer leaktest.AfterTest(t)() + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{WallTime: int64(nanos)} + } + + ctx := context.Background() + const limit = 25 + buffer := rangefeedbuffer.New(limit) + + { // Sanity check the newly initialized rangefeed buffer. + events := buffer.Flush(ctx, ts(0)) + require.True(t, len(events) == 0) + } + + { // Ensure that checkpoint events are discarded. + events := buffer.Flush(ctx, ts(5)) + require.True(t, len(events) == 0) + } + + { // Bumping the frontier timestamp without existing events should behave expectedly. + events := buffer.Flush(ctx, ts(10)) + require.True(t, len(events) == 0) + } + + { // Flushing at a timestamp lower than buffered events should return nothing. + require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13 + require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11 + require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15 + require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12 + require.NoError(t, buffer.Add(ctx, makeEvent("e", ts(18)))) // e@18 + + events := buffer.Flush(ctx, ts(10)) + require.True(t, len(events) == 0) + } + + { // Flushing should capture only entries with timestamps <= the frontier, in sorted order. + events := buffer.Flush(ctx, ts(14)) + + require.True(t, len(events) == 3) + require.Equal(t, events[0].(*testEvent).data, "b") // b@11 + require.Equal(t, events[1].(*testEvent).data, "d") // d@12 + require.Equal(t, events[2].(*testEvent).data, "a") // a@13 + } + + { // Incremental advances should only surface the events until the given timestamp. + events := buffer.Flush(ctx, ts(15)) + + require.True(t, len(events) == 1) + require.Equal(t, events[0].(*testEvent).data, "c") // c@15 + } + + { // Adding events with timestamps <= the last flush are discarded. + require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13 + require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11 + require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15 + require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12 + + events := buffer.Flush(ctx, ts(15)) + require.True(t, len(events) == 0) + } + + { // Additional events are flushed out at appropriate points. + require.NoError(t, buffer.Add(ctx, makeEvent("f", ts(19)))) // f@19 + require.NoError(t, buffer.Add(ctx, makeEvent("g", ts(21)))) // g@21 + + events := buffer.Flush(ctx, ts(20)) + require.True(t, len(events) == 2) + require.Equal(t, events[0].(*testEvent).data, "e") // e@18 + require.Equal(t, events[1].(*testEvent).data, "f") // f@19 + } + + { // Ensure that a timestamp greater than any previous event flushes everything. + events := buffer.Flush(ctx, ts(100)) + require.True(t, len(events) == 1) + require.Equal(t, events[0].(*testEvent).data, "g") // g@21 + } + + { // Sanity check that there are no events left over. + events := buffer.Flush(ctx, ts(100)) + require.True(t, len(events) == 0) + } + + { // Ensure that buffer limits are respected. + for i := 0; i < limit; i++ { + require.NoError(t, buffer.Add(ctx, makeEvent("x", ts(101)))) // x@101 + } + + err := buffer.Add(ctx, makeEvent("x", ts(101))) + require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded) + } +} + +type testEvent struct { + data string + ts hlc.Timestamp +} + +func (t *testEvent) Timestamp() hlc.Timestamp { + return t.ts +} + +var _ rangefeedbuffer.Event = &testEvent{} + +func makeEvent(data string, ts hlc.Timestamp) rangefeedbuffer.Event { + return &testEvent{data: data, ts: ts} +} From d93e7cfd54704399cfc6dd4b4dbb468620103973 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Thu, 14 Oct 2021 17:40:36 -0400 Subject: [PATCH 2/2] rangefeed: omit timestamps on previous values There was a discrepancy with whether or not the previous contained the timestamp of when the previous value was recorded. With the initial scan, we populated the timestamp of the initial scan itself. With regular diffs, we omitted the timestamp. This made for confusing semantics (well, confusing at least for this author). Release note: None --- pkg/kv/kvclient/rangefeed/rangefeed.go | 1 + pkg/kv/kvclient/rangefeed/rangefeed_external_test.go | 3 ++- pkg/roachpb/api.pb.go | 1 + pkg/roachpb/api.proto | 1 + 4 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 83e9c88750e7..8b3e25d4de48 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -275,6 +275,7 @@ func (f *RangeFeed) maybeRunInitialScan( // indicating that the value was previously deleted. if f.withDiff { v.PrevValue = v.Value + v.PrevValue.Timestamp = hlc.Timestamp{} } // It's something of a bummer that we must allocate a new value for each diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go index e31ed8ad8465..3d2d0703b065 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_external_test.go @@ -88,8 +88,9 @@ func TestRangeFeedIntegration(t *testing.T) { v1 := <-rows require.Equal(t, mkKey("a"), v1.Key) // Ensure the initial scan contract is fulfilled when WithDiff is specified. - require.Equal(t, v1.Value, v1.PrevValue) + require.Equal(t, v1.Value.RawBytes, v1.PrevValue.RawBytes) require.Equal(t, v1.Value.Timestamp, afterB) + require.True(t, v1.PrevValue.Timestamp.IsEmpty()) } { v2 := <-rows diff --git a/pkg/roachpb/api.pb.go b/pkg/roachpb/api.pb.go index e13f4041dd0c..18b4015caae7 100644 --- a/pkg/roachpb/api.pb.go +++ b/pkg/roachpb/api.pb.go @@ -7130,6 +7130,7 @@ type RangeFeedValue struct { // 1. with_diff was passed in the corresponding RangeFeedRequest. // 2. the key-value was present and not a deletion tombstone before // this event. + // The timestamp on the previous value is empty. PrevValue Value `protobuf:"bytes,3,opt,name=prev_value,json=prevValue,proto3" json:"prev_value"` } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index 248e4ce8cb1b..6e6eadf491bb 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2385,6 +2385,7 @@ message RangeFeedValue { // 1. with_diff was passed in the corresponding RangeFeedRequest. // 2. the key-value was present and not a deletion tombstone before // this event. + // The timestamp on the previous value is empty. Value prev_value = 3 [(gogoproto.nullable) = false]; }