Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeedbuffer: introduce a rangefeed buffer #71225

Merged
merged 2 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ ALL_TESTS = [
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_test",
"//pkg/kv/kvclient/rangecache:rangecache_test",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (f *RangeFeed) maybeRunInitialScan(
// indicating that the value was previously deleted.
if f.withDiff {
v.PrevValue = v.Value
v.PrevValue.Timestamp = hlc.Timestamp{}
}

// It's something of a bummer that we must allocate a new value for each
Expand Down
115 changes: 114 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func TestRangeFeedIntegration(t *testing.T) {
v1 := <-rows
require.Equal(t, mkKey("a"), v1.Key)
// Ensure the initial scan contract is fulfilled when WithDiff is specified.
require.Equal(t, v1.Value, v1.PrevValue)
require.Equal(t, v1.Value.RawBytes, v1.PrevValue.RawBytes)
require.Equal(t, v1.Value.Timestamp, afterB)
require.True(t, v1.PrevValue.Timestamp.IsEmpty())
}
{
v2 := <-rows
Expand Down Expand Up @@ -333,3 +334,115 @@ func TestWithOnCheckpoint(t *testing.T) {

wg.Wait()
}

// TestRangefeedValueTimestamps tests that the rangefeed values (and previous
// values) have the kind of timestamps we expect when writing, overwriting, and
// deleting keys.
func TestRangefeedValueTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
scratchKey := tc.ScratchRange(t)
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
mkKey := func(k string) roachpb.Key {
return encoding.EncodeStringAscending(scratchKey, k)
}

sp := roachpb.Span{
Key: scratchKey,
EndKey: scratchKey.PrefixEnd(),
}
{
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
}
{
// Lower the closed timestamp target duration to speed up the test.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
require.NoError(t, err)
}

f, err := rangefeed.NewFactory(tc.Stopper(), db, nil)
require.NoError(t, err)

rows := make(chan *roachpb.RangeFeedValue)
r, err := f.RangeFeed(ctx, "test", sp, db.Clock().Now(),
func(ctx context.Context, value *roachpb.RangeFeedValue) {
select {
case rows <- value:
case <-ctx.Done():
}
},
rangefeed.WithDiff(),
)
require.NoError(t, err)
defer r.Close()

mustGetInt := func(value roachpb.Value) int {
val, err := value.GetInt()
require.NoError(t, err)
return int(val)
}

{
beforeWriteTS := db.Clock().Now()
require.NoError(t, db.Put(ctx, mkKey("a"), 1))
afterWriteTS := db.Clock().Now()

v := <-rows
require.Equal(t, mustGetInt(v.Value), 1)
require.True(t, beforeWriteTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterWriteTS))

require.False(t, v.PrevValue.IsPresent())
}

{
beforeOverwriteTS := db.Clock().Now()
require.NoError(t, db.Put(ctx, mkKey("a"), 2))
afterOverwriteTS := db.Clock().Now()

v := <-rows
require.Equal(t, mustGetInt(v.Value), 2)
require.True(t, beforeOverwriteTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterOverwriteTS))

require.True(t, v.PrevValue.IsPresent())
require.Equal(t, mustGetInt(v.PrevValue), 1)
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}

{
beforeDelTS := db.Clock().Now()
require.NoError(t, db.Del(ctx, mkKey("a")))
afterDelTS := db.Clock().Now()

v := <-rows
require.False(t, v.Value.IsPresent())
require.True(t, beforeDelTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterDelTS))

require.True(t, v.PrevValue.IsPresent())
require.Equal(t, mustGetInt(v.PrevValue), 2)
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}

{
beforeDelTS := db.Clock().Now()
require.NoError(t, db.Del(ctx, mkKey("a")))
afterDelTS := db.Clock().Now()

v := <-rows
require.False(t, v.Value.IsPresent())
require.True(t, beforeDelTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterDelTS))

require.False(t, v.PrevValue.IsPresent())
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}
}
45 changes: 45 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "buffer",
srcs = ["buffer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/buffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/util/hlc",
"//pkg/util/log/logcrash",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_library(
name = "rangefeedbuffer",
srcs = ["buffer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "rangefeedbuffer_test",
srcs = ["buffer_test.go"],
deps = [
":rangefeedbuffer",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
102 changes: 102 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeedbuffer

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ErrBufferLimitExceeded is returned by the buffer when attempting to add more
// events than the limit the buffer is configured with.
var ErrBufferLimitExceeded = errors.New("buffer limit exceeded")

// Event is the unit of what can be added to the buffer.
type Event interface {
Timestamp() hlc.Timestamp
}

// Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It
// accumulates raw events which can then be flushed out in timestamp sorted
// order en-masse whenever the rangefeed frontier is bumped. If we accumulate
// more events than the limit allows for, we error out to the caller.
type Buffer struct {
limit int

mu struct {
syncutil.Mutex

events
frontier hlc.Timestamp
}
}

// New constructs a Buffer with the provided limit.
func New(limit int) *Buffer {
return &Buffer{limit: limit}
}

// Add adds the given entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev Event) error {
b.mu.Lock()
defer b.mu.Unlock()

if ev.Timestamp().LessEq(b.mu.frontier) {
// If the entry is at a timestamp less than or equal to our last known
// frontier, we can discard it.
return nil
}

if b.mu.events.Len()+1 > b.limit {
return ErrBufferLimitExceeded
}

b.mu.events = append(b.mu.events, ev)
return nil
}

// Flush returns the timestamp sorted list of accumulated events with timestamps
// less than or equal to the provided frontier timestamp. The timestamp is
// recorded (expected to monotonically increase), and future events with
// timestamps less than or equal to it are discarded.
func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) {
b.mu.Lock()
defer b.mu.Unlock()

if frontier.Less(b.mu.frontier) {
log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier)
}

// Accumulate all events with timestamps <= the given timestamp in sorted
// order.
sort.Sort(&b.mu.events)
idx := sort.Search(len(b.mu.events), func(i int) bool {
return !b.mu.events[i].Timestamp().LessEq(frontier)
})

events = b.mu.events[:idx]
b.mu.events = b.mu.events[idx:]
b.mu.frontier = frontier
return events
}

type events []Event

var _ sort.Interface = (*events)(nil)

func (es *events) Len() int { return len(*es) }
func (es *events) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) }
func (es *events) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] }
Loading