Skip to content

Commit

Permalink
spanconfig: introduce the spanconfig.SQLWatcher
Browse files Browse the repository at this point in the history
This patch introduces the SQLWatcher, which is intended to incrementally
watch for updates to system.zones and system.descriptors. It does so by
establishing rangefeeds at a given timestamp.

The SQLWatcher invokes a callback from time to time  with a list of
updates that have been observed in the window
(previous checkpointTS, checkpointTS]. The checkpointTS is also
provided to the callback.

Internally, the SQLWatcher uses a buffer to keep track of events
generated by the SQLWatcher's rangefeeds. It also tracks the individual
frontier timestamps of both the rangefeeds. This helps to maintain the
notion of the combined frontier timestamp, which is computed as the
minimum of the two. This combined frontier timestamp serves as the
checkpoint to the SQLWatcher's callback function.

This interface isn't hooked up to anything yet. It'll be used by the
sponconfig.Reconciler soon to perform partial reconciliation once
full reconciliation is done. It is intended that the IDs from the
updates produced by the SQLWatcher will be fed into the SQLTranslator.

References cockroachdb#67679
Carved from cockroachdb#69661

Release note: None
  • Loading branch information
arulajmani committed Nov 8, 2021
1 parent f29118b commit 0c172fa
Show file tree
Hide file tree
Showing 23 changed files with 1,447 additions and 43 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
"//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test",
"//pkg/spanconfig/spanconfigstore:spanconfigstore_test",
"//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (t *testServerShim) DistSenderI() interface{} { panic(unsuppor
func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigSQLWatcher() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) }
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func New(limit int) *Buffer {
}

// Add adds the given entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev Event) error {
func (b *Buffer) Add(ev Event) error {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func TestBuffer(t *testing.T) {
}

{ // Flushing at a timestamp lower than buffered events should return nothing.
require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12
require.NoError(t, buffer.Add(ctx, makeEvent("e", ts(18)))) // e@18
require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12
require.NoError(t, buffer.Add(makeEvent("e", ts(18)))) // e@18

events := buffer.Flush(ctx, ts(10))
require.True(t, len(events) == 0)
Expand All @@ -75,18 +75,18 @@ func TestBuffer(t *testing.T) {
}

{ // Adding events with timestamps <= the last flush are discarded.
require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12
require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13
require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11
require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15
require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12

events := buffer.Flush(ctx, ts(15))
require.True(t, len(events) == 0)
}

{ // Additional events are flushed out at appropriate points.
require.NoError(t, buffer.Add(ctx, makeEvent("f", ts(19)))) // f@19
require.NoError(t, buffer.Add(ctx, makeEvent("g", ts(21)))) // g@21
require.NoError(t, buffer.Add(makeEvent("f", ts(19)))) // f@19
require.NoError(t, buffer.Add(makeEvent("g", ts(21)))) // g@21

events := buffer.Flush(ctx, ts(20))
require.True(t, len(events) == 2)
Expand All @@ -107,10 +107,10 @@ func TestBuffer(t *testing.T) {

{ // Ensure that buffer limits are respected.
for i := 0; i < limit; i++ {
require.NoError(t, buffer.Add(ctx, makeEvent("x", ts(101)))) // x@101
require.NoError(t, buffer.Add(makeEvent("x", ts(101)))) // x@101
}

err := buffer.Add(ctx, makeEvent("x", ts(101)))
err := buffer.Add(makeEvent("x", ts(101)))
require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ go_library(
"//pkg/spanconfig/spanconfigkvsubscriber",
"//pkg/spanconfig/spanconfigmanager",
"//pkg/spanconfig/spanconfigsqltranslator",
"//pkg/spanconfig/spanconfigsqlwatcher",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqltranslator"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
Expand Down Expand Up @@ -842,13 +843,22 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set.
spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs)
sqlTranslator := spanconfigsqltranslator.New(execCfg, codec)
sqlWatcher := spanconfigsqlwatcher.New(
codec,
cfg.Settings,
cfg.rangeFeedFactory,
1<<20, /* 1 MB bufferMemLimit */
cfg.stopper,
spanConfigKnobs,
)
spanConfigMgr = spanconfigmanager.New(
cfg.db,
jobRegistry,
cfg.circularInternalExecutor,
cfg.stopper,
cfg.Settings,
cfg.spanConfigAccessor,
sqlWatcher,
sqlTranslator,
spanConfigKnobs,
)
Expand Down
10 changes: 10 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,16 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} {
return ts.sqlServer.spanconfigMgr.SQLTranslator
}

// SpanConfigSQLWatcher is part of TestServerInterface.
func (ts *TestServer) SpanConfigSQLWatcher() interface{} {
if ts.sqlServer.spanconfigMgr == nil {
panic(
"span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs",
)
}
return ts.sqlServer.spanconfigMgr.SQLWatcher
}

// SQLServer is part of TestServerInterface.
func (ts *TestServer) SQLServer() interface{} {
return ts.PGServer().SQLServer
Expand Down
2 changes: 2 additions & 0 deletions pkg/spanconfig/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"spanconfig.go",
"testing_knobs.go",
"types.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig",
visibility = ["//visibility:public"],
Expand All @@ -14,5 +15,6 @@ go_library(
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog/descpb",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)
55 changes: 27 additions & 28 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ func FullTranslate(
return s.Translate(ctx, descpb.IDs{keys.RootNamespaceID})
}

// SQLWatcher watches for events on system.zones and system.descriptors.
type SQLWatcher interface {
// WatchForSQLUpdates watches for changes to zones and descriptors starting at
// the given timestamp (exclusive), informing callers using the handler
// callback.
//
// The handler callback is invoked from time to time with a list of updates
// and a checkpointTS. Invocations of the handler callback provide the
// following semantics:
// 1. Calls to the handler are serial.
// 2. The timestamp supplied to the handler is monotonically increasing.
// 3. The list of DescriptorUpdates supplied to handler includes all events
// in the window (prevInvocationCheckpointTS, checkpointTS].
// 4. No further calls to the handler are made if an invocation returns an
// error.
//
// These guarantees mean that users of this interface are free to persist the
// checkpointTS and later use it to re-establish the SQLWatcher without
// missing any updates.
WatchForSQLUpdates(
ctx context.Context,
startTS hlc.Timestamp,
handler func(ctx context.Context, updates []DescriptorUpdate, checkpointTS hlc.Timestamp) error,
) error
}

// ReconciliationDependencies captures what's needed by the span config
// reconciliation job to perform its task. The job is responsible for
// reconciling a tenant's zone configurations with the clusters span
Expand All @@ -126,11 +152,7 @@ type ReconciliationDependencies interface {

SQLTranslator

// TODO(arul): We'll also want access to a "SQLWatcher", something that
// watches for changes to system.{descriptors, zones} to feed IDs to the
// SQLTranslator. These interfaces will be used by the "Reconciler to perform
// full/partial reconciliation, checkpoint the span config job, and update KV
// with the tenants span config state.
SQLWatcher
}

// Store is a data structure used to store spans and their corresponding
Expand Down Expand Up @@ -217,26 +239,3 @@ type StoreReader interface {
ComputeSplitKey(ctx context.Context, start, end roachpb.RKey) roachpb.RKey
GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error)
}

// Update captures a span and the corresponding config change. It's the unit of
// what can be applied to a StoreWriter.
type Update struct {
// Span captures the key span being updated.
Span roachpb.Span

// Config captures the span config the key span was updated to. An empty
// config indicates the span config being deleted.
Config roachpb.SpanConfig
}

// Deletion returns true if the update corresponds to a span config being
// deleted.
func (u Update) Deletion() bool {
return u.Config.IsEmpty()
}

// Addition returns true if the update corresponds to a span config being
// added.
func (u Update) Addition() bool {
return !u.Deletion()
}
2 changes: 1 addition & 1 deletion pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *KVSubscriber) run(ctx context.Context) error {
update.Config = entry.Config
}

if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil {
if err := buffer.Add(&bufferEvent{update, ev.Value.Timestamp}); err != nil {
select {
case <-ctx.Done():
// The context is canceled when the rangefeed is closed by the
Expand Down
3 changes: 3 additions & 0 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Manager struct {
knobs *spanconfig.TestingKnobs

spanconfig.KVAccessor
spanconfig.SQLWatcher
spanconfig.SQLTranslator
}

Expand All @@ -73,6 +74,7 @@ func New(
stopper *stop.Stopper,
settings *cluster.Settings,
kvAccessor spanconfig.KVAccessor,
sqlWatcher spanconfig.SQLWatcher,
sqlTranslator spanconfig.SQLTranslator,
knobs *spanconfig.TestingKnobs,
) *Manager {
Expand All @@ -86,6 +88,7 @@ func New(
stopper: stopper,
settings: settings,
KVAccessor: kvAccessor,
SQLWatcher: sqlWatcher,
SQLTranslator: sqlTranslator,
knobs: knobs,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/spanconfig/spanconfigmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerCreatedJobInterceptor: func(jobI interface{}) {
Expand Down Expand Up @@ -162,6 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) {
Expand Down Expand Up @@ -237,6 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) {
ts.Stopper(),
ts.ClusterSettings(),
ts.SpanConfigAccessor().(spanconfig.KVAccessor),
ts.SpanConfigSQLWatcher().(spanconfig.SQLWatcher),
ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator),
&spanconfig.TestingKnobs{
ManagerDisableJobCreation: true,
Expand Down
65 changes: 65 additions & 0 deletions pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanconfigsqlwatcher",
srcs = [
"buffer.go",
"sqlwatcher.go",
"zonesdecoder.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/roachpb:with-mocks",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/stop",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "spanconfigsqlwatcher_test",
srcs = [
"buffer_test.go",
"main_test.go",
"sqlwatcher_test.go",
"zonesdecoder_test.go",
],
embed = [":spanconfigsqlwatcher"],
deps = [
"//pkg/base",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/sql/catalog/descpb",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//proto",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 0c172fa

Please sign in to comment.