diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5e4a61d178d1..42bbe62d16aa 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -185,6 +185,7 @@ ALL_TESTS = [ "//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", "//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test", + "//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test", "//pkg/spanconfig/spanconfigstore:spanconfigstore_test", "//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test", "//pkg/sql/catalog/catalogkeys:catalogkeys_test", diff --git a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go index 5f8617371a26..3ce3711ef227 100644 --- a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go +++ b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go @@ -51,32 +51,33 @@ func (t *testServerShim) ServingSQLAddr() string { return t.SQLAddr() } -func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) } -func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) } -func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) } -func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) } -func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) } -func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) } -func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) } -func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) } -func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) } -func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) } -func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) } +func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) } +func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) } +func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) } +func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) } +func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) } +func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) } +func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) } +func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) } +func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SpanConfigSQLWatcherFactory() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) } +func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) SetDistSQLSpanResolver(spanResolver interface{}) { panic(unsupportedShimMethod) } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go index bc54e4b9d2b0..86045b957cc2 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -50,7 +50,7 @@ func New(limit int) *Buffer { } // Add adds the given entry to the buffer. -func (b *Buffer) Add(ctx context.Context, ev Event) error { +func (b *Buffer) Add(ev Event) error { b.mu.Lock() defer b.mu.Unlock() diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go index f729a96705e3..5188beae8e36 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -48,11 +48,11 @@ func TestBuffer(t *testing.T) { } { // Flushing at a timestamp lower than buffered events should return nothing. - require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13 - require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11 - require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15 - require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12 - require.NoError(t, buffer.Add(ctx, makeEvent("e", ts(18)))) // e@18 + require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13 + require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11 + require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15 + require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12 + require.NoError(t, buffer.Add(makeEvent("e", ts(18)))) // e@18 events := buffer.Flush(ctx, ts(10)) require.True(t, len(events) == 0) @@ -75,18 +75,18 @@ func TestBuffer(t *testing.T) { } { // Adding events with timestamps <= the last flush are discarded. - require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13 - require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11 - require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15 - require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12 + require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13 + require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11 + require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15 + require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12 events := buffer.Flush(ctx, ts(15)) require.True(t, len(events) == 0) } { // Additional events are flushed out at appropriate points. - require.NoError(t, buffer.Add(ctx, makeEvent("f", ts(19)))) // f@19 - require.NoError(t, buffer.Add(ctx, makeEvent("g", ts(21)))) // g@21 + require.NoError(t, buffer.Add(makeEvent("f", ts(19)))) // f@19 + require.NoError(t, buffer.Add(makeEvent("g", ts(21)))) // g@21 events := buffer.Flush(ctx, ts(20)) require.True(t, len(events) == 2) @@ -107,10 +107,10 @@ func TestBuffer(t *testing.T) { { // Ensure that buffer limits are respected. for i := 0; i < limit; i++ { - require.NoError(t, buffer.Add(ctx, makeEvent("x", ts(101)))) // x@101 + require.NoError(t, buffer.Add(makeEvent("x", ts(101)))) // x@101 } - err := buffer.Add(ctx, makeEvent("x", ts(101))) + err := buffer.Add(makeEvent("x", ts(101))) require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded) } } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 80ac4fd11f22..1c244a07c225 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -116,6 +116,7 @@ go_library( "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/spanconfig/spanconfigmanager", "//pkg/spanconfig/spanconfigsqltranslator", + "//pkg/spanconfig/spanconfigsqlwatcher", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f3177deb2eee..9ed9363567ae 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqltranslator" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -842,6 +843,14 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set. spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) sqlTranslator := spanconfigsqltranslator.New(execCfg, codec) + sqlWatcherFactory := spanconfigsqlwatcher.NewFactory( + codec, + cfg.Settings, + cfg.rangeFeedFactory, + 1<<20, /* 1 MB bufferMemLimit */ + cfg.stopper, + spanConfigKnobs, + ) spanConfigMgr = spanconfigmanager.New( cfg.db, jobRegistry, @@ -849,6 +858,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.stopper, cfg.Settings, cfg.spanConfigAccessor, + sqlWatcherFactory, sqlTranslator, spanConfigKnobs, ) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 9b71c7d6f5ee..03ea76d8c832 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -961,6 +961,16 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} { return ts.sqlServer.spanconfigMgr.SQLTranslator } +// SpanConfigSQLWatcherFactory is part of TestServerInterface. +func (ts *TestServer) SpanConfigSQLWatcherFactory() interface{} { + if ts.sqlServer.spanconfigMgr == nil { + panic( + "span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs", + ) + } + return ts.sqlServer.spanconfigMgr.SQLWatcherFactory +} + // SQLServer is part of TestServerInterface. func (ts *TestServer) SQLServer() interface{} { return ts.PGServer().SQLServer diff --git a/pkg/spanconfig/BUILD.bazel b/pkg/spanconfig/BUILD.bazel index 6f3d8fc5a0b0..b85ee3515697 100644 --- a/pkg/spanconfig/BUILD.bazel +++ b/pkg/spanconfig/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/base", "//pkg/keys", "//pkg/roachpb:with-mocks", + "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/util/hlc", ], diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 97e4c6dc7f72..469ca83a56aa 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -117,6 +118,59 @@ func FullTranslate( return s.Translate(ctx, descpb.IDs{keys.RootNamespaceID}) } +// SQLWatcher watches for events on system.zones and system.descriptors. +type SQLWatcher interface { + // WatchForSQLUpdates watches for updates to zones and descriptors starting at + // the given timestamp (exclusive), informing callers using the handler + // callback. + // + // The handler callback[1] is invoked from time to time with a list of updates + // and a checkpointTS. Invocations of the handler callback provide the + // following semantics: + // 1. Calls to the handler are serial. + // 2. The timestamp supplied to the handler is monotonically increasing. + // 3. The list of DescriptorUpdates supplied to handler includes all events + // in the window (prevInvocationCheckpointTS, checkpointTS]. + // 4. No further calls to the handler are made if a call to the handler + // returns an error. + // + // These guarantees mean that users of this interface are free to persist the + // checkpointTS and later use it to re-establish a SQLWatcher without missing + // any updates. + // + // WatchForSQLUpdates can only ever be called once, effectively making the + // SQLWatcher a single use interface. + // + // WatchForSQLUpdates may run out of memory and return an error if it is + // tracking too many events between two checkpoints. + // + // [1] Users of this interface should not intend to do expensive work in the + // handler callback. + // TODO(arul): Possibly get rid of this limitation. + WatchForSQLUpdates( + ctx context.Context, + startTS hlc.Timestamp, + handler func(ctx context.Context, updates []DescriptorUpdate, checkpointTS hlc.Timestamp) error, + ) error +} + +// DescriptorUpdate captures the ID and type of a descriptor or zone that the +// SQLWatcher has observed updated. +type DescriptorUpdate struct { + // ID of the descriptor/zone that has been updated. + ID descpb.ID + + // DescriptorType of the descriptor/zone that has been updated. Could be either + // the specific type or catalog.Any if no information is available. + DescriptorType catalog.DescriptorType +} + +// SQLWatcherFactory is used to construct new SQLWatchers. +type SQLWatcherFactory interface { + // New returns a new SQLWatcher. + New() SQLWatcher +} + // ReconciliationDependencies captures what's needed by the span config // reconciliation job to perform its task. The job is responsible for // reconciling a tenant's zone configurations with the clusters span @@ -126,11 +180,7 @@ type ReconciliationDependencies interface { SQLTranslator - // TODO(arul): We'll also want access to a "SQLWatcher", something that - // watches for changes to system.{descriptors, zones} to feed IDs to the - // SQLTranslator. These interfaces will be used by the "Reconciler to perform - // full/partial reconciliation, checkpoint the span config job, and update KV - // with the tenants span config state. + SQLWatcherFactory } // Store is a data structure used to store spans and their corresponding @@ -235,8 +285,7 @@ func (u Update) Deletion() bool { return u.Config.IsEmpty() } -// Addition returns true if the update corresponds to a span config being -// added. +// Addition returns true if the update corresponds to a span config being added. func (u Update) Addition() bool { return !u.Deletion() } diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go index 77fca41a25c1..94405b6fa495 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -260,7 +260,7 @@ func (s *KVSubscriber) run(ctx context.Context) error { update.Config = entry.Config } - if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil { + if err := buffer.Add(&bufferEvent{update, ev.Value.Timestamp}); err != nil { select { case <-ctx.Done(): // The context is canceled when the rangefeed is closed by the diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index e4130f722d14..010fe2e8d9de 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -60,6 +60,7 @@ type Manager struct { knobs *spanconfig.TestingKnobs spanconfig.KVAccessor + spanconfig.SQLWatcherFactory spanconfig.SQLTranslator } @@ -73,6 +74,7 @@ func New( stopper *stop.Stopper, settings *cluster.Settings, kvAccessor spanconfig.KVAccessor, + sqlWatcherFactory spanconfig.SQLWatcherFactory, sqlTranslator spanconfig.SQLTranslator, knobs *spanconfig.TestingKnobs, ) *Manager { @@ -80,14 +82,15 @@ func New( knobs = &spanconfig.TestingKnobs{} } return &Manager{ - db: db, - jr: jr, - ie: ie, - stopper: stopper, - settings: settings, - KVAccessor: kvAccessor, - SQLTranslator: sqlTranslator, - knobs: knobs, + db: db, + jr: jr, + ie: ie, + stopper: stopper, + settings: settings, + KVAccessor: kvAccessor, + SQLWatcherFactory: sqlWatcherFactory, + SQLTranslator: sqlTranslator, + knobs: knobs, } } diff --git a/pkg/spanconfig/spanconfigmanager/manager_test.go b/pkg/spanconfig/spanconfigmanager/manager_test.go index a8b0a7bd7ab2..067befba1464 100644 --- a/pkg/spanconfig/spanconfigmanager/manager_test.go +++ b/pkg/spanconfig/spanconfigmanager/manager_test.go @@ -74,6 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory), ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), &spanconfig.TestingKnobs{ ManagerCreatedJobInterceptor: func(jobI interface{}) { @@ -162,6 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory), ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), &spanconfig.TestingKnobs{ ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) { @@ -237,6 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory), ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, diff --git a/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel new file mode 100644 index 000000000000..bb47eba6a7c7 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel @@ -0,0 +1,67 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigsqlwatcher", + srcs = [ + "buffer.go", + "sqlwatcher.go", + "zonesdecoder.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/settings/cluster", + "//pkg/spanconfig", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/log/logcrash", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigsqlwatcher_test", + srcs = [ + "buffer_test.go", + "main_test.go", + "sqlwatcher_test.go", + "zonesdecoder_test.go", + ], + embed = [":spanconfigsqlwatcher"], + deps = [ + "//pkg/base", + "//pkg/config/zonepb", + "//pkg/jobs", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/protoutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_gogo_protobuf//proto", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go new file mode 100644 index 000000000000..8a50ecd2cba1 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go @@ -0,0 +1,159 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// buffer is a helper struct for the SQLWatcher. It buffers events generated by +// the SQLWatcher's rangefeeds over system.zones and system.descriptors. It is +// safe for concurrent use. +// +// The buffer tracks frontier timestamps for both these rangefeeds as well. It +// maintains the notion of the combined frontier timestamp computed as the +// minimum of the two. This is used when flushing the buffer periodically. +type buffer struct { + mu struct { + syncutil.Mutex + + // rangefeed.Buffer stores spanconfigsqlwatcher.Events. + buffer *rangefeedbuffer.Buffer + + // rangefeedFrontiers tracks the frontier timestamps of individual + // rangefeeds established by the SQLWatcher. + rangefeedFrontiers [numRangefeeds]hlc.Timestamp + } +} + +// event is the unit produced by the rangefeeds the SQLWatcher establishes over +// system.zones and system.descriptors. It implements the rangefeedbuffer.Event +// interface. +type event struct { + // timestamp at which the event was generated by the rangefeed. + timestamp hlc.Timestamp + + // update captures information about the descriptor or zone that the + // SQLWatcher has observed change. + update spanconfig.DescriptorUpdate +} + +// Timestamp implements the rangefeedbuffer.Event interface. +func (e event) Timestamp() hlc.Timestamp { + return e.timestamp +} + +// rangefeedKind is used to identify the distinct rangefeeds {descriptors, +// zones} established by the SQLWatcher. +type rangefeedKind int + +const ( + zonesRangefeed rangefeedKind = iota + descriptorsRangefeed + + // numRangefeeds should be listed last. + numRangefeeds int = iota +) + +// newBuffer constructs and returns a new buffer. +func newBuffer(limit int) *buffer { + rangefeedBuffer := rangefeedbuffer.New(limit) + eventBuffer := &buffer{} + eventBuffer.mu.buffer = rangefeedBuffer + return eventBuffer +} + +// advance advances the frontier for the given rangefeed. +func (b *buffer) advance(rangefeed rangefeedKind, timestamp hlc.Timestamp) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.rangefeedFrontiers[rangefeed].Forward(timestamp) +} + +// add records the given event in the buffer. +func (b *buffer) add(ev event) error { + b.mu.Lock() + defer b.mu.Unlock() + return b.mu.buffer.Add(ev) +} + +// flushEvents computes the combined frontier timestamp of the buffer and +// returns a list of relevant events which were buffered up to that timestamp. +func (b *buffer) flushEvents( + ctx context.Context, +) (updates []rangefeedbuffer.Event, combinedFrontierTS hlc.Timestamp) { + b.mu.Lock() + defer b.mu.Unlock() + // First we determine the checkpoint timestamp, which is the minimum + // checkpoint timestamp of all event types. + combinedFrontierTS = hlc.MaxTimestamp + for _, ts := range b.mu.rangefeedFrontiers { + combinedFrontierTS.Backward(ts) + } + + return b.mu.buffer.Flush(ctx, combinedFrontierTS), combinedFrontierTS +} + +// flush computes the combined frontier timestamp of the buffer and returns a +// list of unique spanconfig.DescriptorUpdates below this timestamp. The +// combined frontier timestamp is also returned. +func (b *buffer) flush( + ctx context.Context, +) (updates []spanconfig.DescriptorUpdate, _ hlc.Timestamp, _ error) { + events, combinedFrontierTS := b.flushEvents(ctx) + sort.Slice(events, func(i, j int) bool { + ei, ej := events[i].(event), events[j].(event) + if ei.update.ID == ej.update.ID { + return ei.timestamp.Less(ej.timestamp) + } + return ei.update.ID < ej.update.ID + }) + for i, ev := range events { + if i == 0 || events[i-1].(event).update.ID != ev.(event).update.ID { + updates = append(updates, ev.(event).update) + continue + } + descType, err := combine(updates[len(updates)-1].DescriptorType, ev.(event).update.DescriptorType) + if err != nil { + return nil, hlc.Timestamp{}, err + } + updates[len(updates)-1].DescriptorType = descType + } + return updates, combinedFrontierTS, nil +} + +// combine takes two catalog.DescriptorTypes and combines them according to the +// following semantics: +// - Any can combine with any concrete descriptor type (including itself). +// Concrete descriptor types are {Table,Database,Schema,Type} descriptor types. +// - Concrete descriptor types can combine with themselves. +// - A concrete descriptor type cannot combine with another concrete descriptor +// type. +func combine(d1 catalog.DescriptorType, d2 catalog.DescriptorType) (catalog.DescriptorType, error) { + if d1 == d2 { + return d1, nil + } + if d1 == catalog.Any { + return d2, nil + } + if d2 == catalog.Any { + return d1, nil + } + return catalog.Any, errors.AssertionFailedf("cannot combine %s and %s", d1, d2) +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go new file mode 100644 index 000000000000..1005940394b7 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go @@ -0,0 +1,235 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestBuffer adds DescriptorUpdate events to the buffer and ensures that +// checkpointing and ID flushing semantics work correctly. +func TestBuffer(t *testing.T) { + defer leaktest.AfterTest(t)() + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: int64(nanos), + } + } + makeEvent := func(descID int, timestamp hlc.Timestamp) event { + return event{ + update: spanconfig.DescriptorUpdate{ + ID: descpb.ID(descID), + DescriptorType: catalog.Any, + }, + timestamp: timestamp, + } + } + makeUpdates := func(descIDs ...int) []spanconfig.DescriptorUpdate { + updates := make([]spanconfig.DescriptorUpdate, 0, len(descIDs)) + for _, descID := range descIDs { + updates = append(updates, spanconfig.DescriptorUpdate{ + ID: descpb.ID(descID), + DescriptorType: catalog.Any, + }) + } + return updates + } + + ctx := context.Background() + buffer := newBuffer(10 /* limit */) + + // Sanity check the newly initialized event buffer. + updates, combinedFrontierTS, err := buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(0), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Add a few events without advancing any of the frontiers. We don't expect + // anything to be returned by the call to flush yet. + err = buffer.add(makeEvent(1, ts(10))) + require.NoError(t, err) + + err = buffer.add(makeEvent(2, ts(11))) + require.NoError(t, err) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(0), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Advance the zones frontier. We expect flush to still not return any results + // as the descriptors frontier hasn't been advanced yet. + buffer.advance(zonesRangefeed, ts(11)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(0), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Advance the descriptors frontier to a lower timestamp than the zones + // frontier above. Flush should now return the lower timestamp as the + // combinedFrontierTS. Furthermore, we only expect one id to be returned. + buffer.advance(descriptorsRangefeed, ts(10)) + require.NoError(t, err) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(10), combinedFrontierTS) + require.Equal(t, makeUpdates(1), updates) + + // Bump the descriptors frontier past the zones frontier. This should bump the + // combinedFrontierTS of the buffer to 11, resulting in flush returning the + // last ID in the buffer. + buffer.advance(descriptorsRangefeed, ts(20)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.Equal(t, makeUpdates(2), updates) + + // No updates are left in the buffer below the combined frontier (which hasn't + // changed from above). + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Try regressing the zones frontier by advancing it to a timestamp below what + // it was previously advanced to (11). This should essentially no-op, which we + // can check by looking at the combinedFrontierTS from the call to flush. + buffer.advance(zonesRangefeed, ts(5)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Try adding an event at a TS below the combinedFrontierTS. This should no-op + // as well and the ID should not be returned when flushing. + err = buffer.add(makeEvent(1, ts(5))) + require.NoError(t, err) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Lastly, ensure that flushing doesn't return duplicate IDs even when more + // than one updates have been added for a given ID. + err = buffer.add(makeEvent(1, ts(12))) + require.NoError(t, err) + err = buffer.add(makeEvent(1, ts(13))) + require.NoError(t, err) + err = buffer.add(makeEvent(2, ts(14))) + require.NoError(t, err) + buffer.advance(zonesRangefeed, ts(14)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(14), combinedFrontierTS) + require.Equal(t, makeUpdates(1, 2), updates) +} + +// TestBufferCombinesDescriptorTypes ensures that the descriptor type of the +// same ID is combined correctly when flushing. It also ensures that the +// semantics around flushing are sound, in particular: +// - Type spanconfig.Any can combine with any concrete descriptor type +// (and itself). +// - A concrete descriptor type can combine with itself. +// - A concrete descriptor type cannot combine with a different concrete +// descriptor type. +func TestBufferCombinesDescriptorTypes(t *testing.T) { + defer leaktest.AfterTest(t)() + + buffer := newBuffer(10 /* limit */) + ctx := context.Background() + + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: int64(nanos), + } + } + makeEvent := func(descID int, descType catalog.DescriptorType, timestamp hlc.Timestamp) event { + return event{ + update: spanconfig.DescriptorUpdate{ + ID: descpb.ID(descID), + DescriptorType: descType, + }, + timestamp: timestamp, + } + } + advanceRangefeeds := func(timestamp hlc.Timestamp) { + buffer.advance(descriptorsRangefeed, timestamp) + buffer.advance(zonesRangefeed, timestamp) + } + makeUpdates := func(descID int, descType catalog.DescriptorType) []spanconfig.DescriptorUpdate { + return []spanconfig.DescriptorUpdate{ + { + ID: descpb.ID(descID), + DescriptorType: descType, + }, + } + } + + concreteDescriptorTypes := []catalog.DescriptorType{ + catalog.Table, + catalog.Database, + catalog.Schema, + catalog.Type, + } + + nanos := 1 + // Test 3 events on the same ID, with a concrete descriptor event + // (eg. catalog.Table) flanked by catalog.Any on either side. This + // should all combine to the concrete descriptor type. + { + for _, descType := range concreteDescriptorTypes { + err := buffer.add(makeEvent(1, catalog.Any, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, descType, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, catalog.Any, ts(nanos))) + require.NoError(t, err) + nanos++ + advanceRangefeeds(ts(nanos)) + + updates, combinedFrontierTS, err := buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(nanos), combinedFrontierTS) + require.Equal(t, updates, makeUpdates(1, descType)) + } + } + + // Test all descriptor types (including spanconfig.Any) combine with each + // other. + { + for _, descType := range append(concreteDescriptorTypes, catalog.Any) { + err := buffer.add(makeEvent(1, descType, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, descType, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, catalog.Any, ts(nanos))) + require.NoError(t, err) + nanos++ + advanceRangefeeds(ts(nanos)) + + updates, combinedFrontierTS, err := buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(nanos), combinedFrontierTS) + require.Equal(t, updates, makeUpdates(1, descType)) + } + } +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/main_test.go b/pkg/spanconfig/spanconfigsqlwatcher/main_test.go new file mode 100644 index 000000000000..df87ac32c5f4 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go new file mode 100644 index 000000000000..9026269e89b4 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go @@ -0,0 +1,335 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "context" + "sync/atomic" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" +) + +// Factory implements the spanconfig.SQLWatcherFactory interface. +var _ spanconfig.SQLWatcherFactory = &Factory{} + +// SQLWatcher implements the spanconfig.SQLWatcher interface. +var _ spanconfig.SQLWatcher = &SQLWatcher{} + +// Factory is used to construct spanconfig.SQLWatcher interfaces. +type Factory struct { + codec keys.SQLCodec + settings *cluster.Settings + stopper *stop.Stopper + knobs *spanconfig.TestingKnobs + rangeFeedFactory *rangefeed.Factory + bufferMemLimit int64 +} + +// NewFactory constructs a new Factory. +func NewFactory( + codec keys.SQLCodec, + settings *cluster.Settings, + rangeFeedFactory *rangefeed.Factory, + bufferMemLimit int64, + stopper *stop.Stopper, + knobs *spanconfig.TestingKnobs, +) *Factory { + if knobs == nil { + knobs = &spanconfig.TestingKnobs{} + } + return &Factory{ + codec: codec, + settings: settings, + rangeFeedFactory: rangeFeedFactory, + stopper: stopper, + bufferMemLimit: bufferMemLimit, + knobs: knobs, + } +} + +// SQLWatcher is the concrete implementation of spanconfig.SQLWatcher. It +// establishes rangefeeds over system.zones and system.descriptors to +// incrementally watch for SQL updates. +type SQLWatcher struct { + codec keys.SQLCodec + settings *cluster.Settings + rangeFeedFactory *rangefeed.Factory + stopper *stop.Stopper + + buffer *buffer + + knobs *spanconfig.TestingKnobs + + started int32 // accessed atomically. +} + +// sqlWatcherBufferEntrySize is the size of an entry stored in the sqlWatcher's +// buffer. We use this value to calculate the buffer capacity. +const sqlWatcherBufferEntrySize = int64(unsafe.Sizeof(event{}) + unsafe.Sizeof(rangefeedbuffer.Event(nil))) + +// New constructs a spanconfig.SQLWatcher. +func (f *Factory) New() spanconfig.SQLWatcher { + return &SQLWatcher{ + codec: f.codec, + settings: f.settings, + rangeFeedFactory: f.rangeFeedFactory, + stopper: f.stopper, + buffer: newBuffer(int(f.bufferMemLimit / sqlWatcherBufferEntrySize)), + knobs: f.knobs, + } +} + +// WatchForSQLUpdates is part of the spanconfig.SQLWatcher interface. +func (s *SQLWatcher) WatchForSQLUpdates( + ctx context.Context, + timestamp hlc.Timestamp, + handler func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error, +) error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + return errors.AssertionFailedf("watcher already started watching") + } + + // The callbacks below are invoked by both the rangefeeds we establish, both + // of which run on separate goroutines. To ensure calls to the handler + // function are serial we only ever run it on the main thread of + // WatchForSQLUpdates (instead of pushing it into the rangefeed callbacks). + // The rangefeed callbacks use channels to report errors and notifications to + // to flush events from the buffer. As WatchForSQLUpdate's main thread is the + // sole listener on these channels, doing expensive work in the handler + // function can lead to blocking the rangefeed, which isn't great. This is an + // unfortunate asterisk for users of this interface to be aware of. + // + // TODO(arul): Possibly get rid of this limitation by introducing another + // buffer interface here to store updates produced by the Watcher so that + // we can run the handler in a separate goroutine and still provide the + // serial semantics. + errCh := make(chan error) + frontierAdvanced := make(chan struct{}) + onFrontierAdvance := func(ctx context.Context, rangefeed rangefeedKind, timestamp hlc.Timestamp) { + s.buffer.advance(rangefeed, timestamp) + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is being closed, which + // happens after we've stopped listening on the frontierAdvancedCh. + case frontierAdvanced <- struct{}{}: + } + } + onEvent := func(ctx context.Context, event event) { + err := func() error { + if fn := s.knobs.SQLWatcherOnEventInterceptor; fn != nil { + if err := fn(); err != nil { + return err + } + } + return s.buffer.add(event) + }() + if err != nil { + log.Warningf(ctx, "error adding event %v: %v", event, err) + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is being closed, which + // happens after we've stopped listening on the errCh. + case errCh <- err: + } + } + } + + descriptorsRF, err := s.watchForDescriptorUpdates(ctx, timestamp, onEvent, onFrontierAdvance) + if err != nil { + return errors.Wrapf(err, "error establishing rangefeed over system.descriptors") + } + defer descriptorsRF.Close() + zonesRF, err := s.watchForZoneConfigUpdates(ctx, timestamp, onEvent, onFrontierAdvance) + if err != nil { + return errors.Wrapf(err, "error establishing rangefeed over system.zones") + } + defer zonesRF.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.stopper.ShouldQuiesce(): + return nil + case err = <-errCh: + return err + case <-frontierAdvanced: + events, combinedFrontierTS, err := s.buffer.flush(ctx) + if err != nil { + return err + } + if len(events) == 0 { + continue + } + if err := handler(ctx, events, combinedFrontierTS); err != nil { + return err + } + } + } +} + +// watchForDescriptorUpdates establishes a rangefeed over system.descriptors and +// invokes the onEvent callback for observed events. The onFrontierAdvance +// callback is invoked whenever the rangefeed frontier is advanced as well. +func (s *SQLWatcher) watchForDescriptorUpdates( + ctx context.Context, + timestamp hlc.Timestamp, + onEvent func(context.Context, event), + onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp), +) (*rangefeed.RangeFeed, error) { + descriptorTableStart := s.codec.TablePrefix(keys.DescriptorTableID) + descriptorTableSpan := roachpb.Span{ + Key: descriptorTableStart, + EndKey: descriptorTableStart.PrefixEnd(), + } + handleEvent := func(ctx context.Context, ev *roachpb.RangeFeedValue) { + if !ev.Value.IsPresent() && !ev.PrevValue.IsPresent() { + // Event for a tombstone on a tombstone -- nothing for us to do here. + return + } + value := ev.Value + if !ev.Value.IsPresent() { + // The descriptor was deleted. + value = ev.PrevValue + } + + var descriptor descpb.Descriptor + if err := value.GetProto(&descriptor); err != nil { + logcrash.ReportOrPanic( + ctx, + &s.settings.SV, + "%s: failed to unmarshal descriptor %v", + ev.Key, + value, + ) + return + } + if descriptor.Union == nil { + return + } + + table, database, typ, schema := descpb.FromDescriptorWithMVCCTimestamp(&descriptor, value.Timestamp) + + var id descpb.ID + var descType catalog.DescriptorType + switch { + case table != nil: + id = table.GetID() + descType = catalog.Table + case database != nil: + id = database.GetID() + descType = catalog.Database + case typ != nil: + id = typ.GetID() + descType = catalog.Type + case schema != nil: + id = schema.GetID() + descType = catalog.Schema + default: + logcrash.ReportOrPanic(ctx, &s.settings.SV, "unknown descriptor unmarshalled %v", descriptor) + } + + rangefeedEvent := event{ + timestamp: ev.Value.Timestamp, + update: spanconfig.DescriptorUpdate{ + ID: id, + DescriptorType: descType, + }, + } + onEvent(ctx, rangefeedEvent) + } + rf, err := s.rangeFeedFactory.RangeFeed( + ctx, + "sql-watcher-descriptor-rangefeed", + descriptorTableSpan, + timestamp, + handleEvent, + rangefeed.WithDiff(), + rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { + onFrontierAdvance(ctx, descriptorsRangefeed, resolvedTS) + }), + ) + if err != nil { + return nil, err + } + + log.Infof(ctx, "established range feed over system.descriptors table starting at time %s", timestamp) + return rf, nil +} + +// watchForZoneConfigUpdates establishes a rangefeed over system.zones and +// invokes the onEvent callback whenever an event is observed. The +// onFrontierAdvance callback is also invoked whenever the rangefeed frontier is +// advanced. +func (s *SQLWatcher) watchForZoneConfigUpdates( + ctx context.Context, + timestamp hlc.Timestamp, + onEvent func(context.Context, event), + onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp), +) (*rangefeed.RangeFeed, error) { + zoneTableStart := s.codec.TablePrefix(keys.ZonesTableID) + zoneTableSpan := roachpb.Span{ + Key: zoneTableStart, + EndKey: zoneTableStart.PrefixEnd(), + } + + decoder := newZonesDecoder(s.codec) + handleEvent := func(ctx context.Context, ev *roachpb.RangeFeedValue) { + descID, err := decoder.DecodePrimaryKey(ev.Key) + if err != nil { + logcrash.ReportOrPanic( + ctx, + &s.settings.SV, + "sql watcher zones range feed error: %v", + err, + ) + return + } + + rangefeedEvent := event{ + timestamp: ev.Value.Timestamp, + update: spanconfig.DescriptorUpdate{ + ID: descID, + DescriptorType: catalog.Any, + }, + } + onEvent(ctx, rangefeedEvent) + } + rf, err := s.rangeFeedFactory.RangeFeed( + ctx, + "sql-watcher-zones-rangefeed", + zoneTableSpan, + timestamp, + handleEvent, + rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { + onFrontierAdvance(ctx, zonesRangefeed, resolvedTS) + }), + ) + if err != nil { + return nil, err + } + + log.Infof(ctx, "established range feed over system.zones table starting at time %s", timestamp) + return rf, nil +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go new file mode 100644 index 000000000000..39e8dd9cd7f7 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go @@ -0,0 +1,382 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestSQLWatcherReactsToUpdates verifies that the SQLWatcher emits the correct +// updates following changes made to system.descriptor or system.zones. It also +// ensures that the timestamp supplied to the handler function is monotonically +// increasing. +func TestSQLWatcherReactsToUpdates(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + setup string + stmt string + expectedIDs descpb.IDs + }{ + { + stmt: "CREATE TABLE t()", + expectedIDs: descpb.IDs{52}, + }, + { + setup: "CREATE TABLE t2()", + stmt: "ALTER TABLE t2 CONFIGURE ZONE USING num_replicas = 3", + expectedIDs: descpb.IDs{53}, + }, + { + setup: "CREATE DATABASE d; CREATE TABLE d.t1(); CREATE TABLE d.t2()", + stmt: "ALTER DATABASE d CONFIGURE ZONE USING num_replicas=5", + expectedIDs: descpb.IDs{54}, + }, + { + setup: "CREATE TABLE t3(); CREATE TABLE t4()", + stmt: "ALTER TABLE t3 CONFIGURE ZONE USING num_replicas=5; CREATE TABLE t5(); DROP TABLE t4;", + expectedIDs: descpb.IDs{57, 58, 59}, + }, + // Named zone tests. + { + stmt: "ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.RootNamespaceID}, + }, + { + stmt: "ALTER RANGE liveness CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.LivenessRangesID}, + }, + { + stmt: "ALTER RANGE meta CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.MetaRangesID}, + }, + { + stmt: "ALTER RANGE system CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.SystemRangesID}, + }, + { + stmt: "ALTER RANGE timeseries CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.TimeseriesRangesID}, + }, + // Test that events on types/schemas are also captured. + { + setup: "CREATE DATABASE db", + stmt: "CREATE SCHEMA db.sc", + // One ID each for the parent database and the schema. + expectedIDs: descpb.IDs{60, 61}, + }, + { + stmt: "CREATE TYPE typ AS ENUM()", + // One ID each for the enum and the array type. + expectedIDs: descpb.IDs{62, 63}, + }, + } + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + + ts := tc.Server(0 /* idx */) + + sqlDB := tc.ServerConn(0 /* idx */) + for _, tc := range testCases { + sqlWatcher := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + nil, /* knobs */ + ).New() + + _, err := sqlDB.Exec(tc.setup) + require.NoError(t, err) + + // Save the startTS here before the test case is executed to ensure the + // watcher can pick up the change when we start it in a separate goroutine. + startTS := ts.Clock().Now() + + var mu struct { + syncutil.Mutex + receivedIDs map[descpb.ID]struct{} + } + mu.receivedIDs = make(map[descpb.ID]struct{}) + + watcherCtx, watcherCancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + prevCheckpointTS := startTS + _ = sqlWatcher.WatchForSQLUpdates(watcherCtx, + startTS, + func(ctx context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error { + require.True(t, prevCheckpointTS.Less(checkpointTS)) + mu.Lock() + defer mu.Unlock() + for _, update := range updates { + mu.receivedIDs[update.ID] = struct{}{} + } + prevCheckpointTS = checkpointTS + return nil + }) + }() + + _, err = sqlDB.Exec(tc.stmt) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + if len(mu.receivedIDs) == len(tc.expectedIDs) { + return nil + } + return errors.Newf("expected to receive %d IDs, but found %d", len(tc.expectedIDs), len(mu.receivedIDs)) + }) + + // Rangefeed events aren't guaranteed to be in any particular order for + // different keys. + mu.Lock() + for _, id := range tc.expectedIDs { + _, seen := mu.receivedIDs[id] + require.True(t, seen) + } + mu.Unlock() + + // Stop the watcher and wait for the goroutine to complete. + watcherCancel() + wg.Wait() + } +} + +// TestSQLWatcherFactory tests that the SQLWatcherFactory can create multiple +// SQLWatchers and that every SQLWatcher is only good for a single +// WatchForSQLUpdates. +func TestSQLWatcherFactory(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + sqlDB := tc.ServerConn(0 /* idx */) + + sqlWatcherFactory := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + nil, /* knobs */ + ) + + startTS := ts.Clock().Now() + _, err := sqlDB.Exec("CREATE TABLE t()") + require.NoError(t, err) + + sqlWatcher := sqlWatcherFactory.New() + + var wg sync.WaitGroup + + watcherCtx, watcherCancel := context.WithCancel(ctx) + wg.Add(1) + go func() { + defer wg.Done() + _ = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Error("handler should never run") + return nil + }) + }() + + watcherCancel() + wg.Wait() + + err = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Fatal("handler should never run") + return nil + }) + require.Error(t, err) + require.True(t, testutils.IsError(err, "watcher already started watching")) + + newSQLWatcher := sqlWatcherFactory.New() + newWatcherCtx, newWatcherCancel := context.WithCancel(ctx) + + wg.Add(1) + go func() { + defer wg.Done() + err = newSQLWatcher.WatchForSQLUpdates(newWatcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Error("handler should never run") + return nil + }) + require.True(t, testutils.IsError(err, "context canceled")) + }() + + newWatcherCancel() + wg.Wait() +} + +// TestSQLWatcherOnEventError ensures that if there is an error processing a +// rangefeed event the handler is never run and the error is bubbled back to +// the caller. +func TestSQLWatcherOnEventError(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + sqlDB := tc.ServerConn(0 /* idx */) + + sqlWatcherFactory := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + &spanconfig.TestingKnobs{ + SQLWatcherOnEventInterceptor: func() error { + return errors.New("boom") + }, + }, + ) + + startTS := ts.Clock().Now() + _, err := sqlDB.Exec("CREATE TABLE t()") + require.NoError(t, err) + + sqlWatcher := sqlWatcherFactory.New() + err = sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Fatal("handler should never run") + return nil + }) + require.Error(t, err) + require.True(t, testutils.IsError(err, "boom")) +} + +// TestSQLWatcherHandlerError ensures that no further calls are made to the +// handler after it returns an error. +func TestSQLWatcherHandlerError(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + sqlDB := tc.ServerConn(0 /* idx */) + + sqlWatcher := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + nil, /* knobs */ + ).New() + + _, err := sqlDB.Exec("CREATE TABLE t()") + require.NoError(t, err) + + stopTraffic := make(chan struct{}) + startTS := ts.Clock().Now() + + var wg sync.WaitGroup + wg.Add(1) + + // Start a background thread that modifies zone configs on a table to trigger + // events on system.zones. + go func() { + defer wg.Done() + for { + select { + case <-stopTraffic: + return + default: + } + _, err = sqlDB.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=5") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + } + }() + + var numCalled int32 + // Wrap the call to WatchForSQLUpdates in a SucceedsSoon to ensure it + // evaluates within 45 seconds. + testutils.SucceedsSoon(t, func() error { + err := sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + atomic.AddInt32(&numCalled, 1) + return errors.New("handler error") + }) + require.Error(t, err) + require.True(t, testutils.IsError(err, "handler error")) + return nil + }) + + // Shutdown the background goroutine. + close(stopTraffic) + wg.Wait() + + // Ensure that the handler was called only once. + require.Equal(t, int32(1), atomic.LoadInt32(&numCalled)) +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go new file mode 100644 index 000000000000..7d27e77b91d3 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// zonesDecoder decodes the zone ID (primary key) of rows from system.zones. +// It's not safe for concurrent use. +type zonesDecoder struct { + alloc rowenc.DatumAlloc + codec keys.SQLCodec +} + +// newZonesDecoder instantiates a zonesDecoder. +func newZonesDecoder(codec keys.SQLCodec) *zonesDecoder { + return &zonesDecoder{ + codec: codec, + } +} + +// DecodePrimaryKey decodes the primary key (zone ID) from the system.zones +// table. +func (zd *zonesDecoder) DecodePrimaryKey(key roachpb.Key) (descpb.ID, error) { + // Decode the descriptor ID from the key. + tbl := systemschema.ZonesTable + types := []*types.T{tbl.PublicColumns()[0].GetType()} + startKeyRow := make([]rowenc.EncDatum, 1) + _, matches, _, err := rowenc.DecodeIndexKey( + zd.codec, types, startKeyRow, nil /* colDirs */, key, + ) + if err != nil || !matches { + return descpb.InvalidID, errors.AssertionFailedf("failed to decode key in system.zones %v", key) + } + if err := startKeyRow[0].EnsureDecoded(types[0], &zd.alloc); err != nil { + return descpb.InvalidID, errors.AssertionFailedf("failed to decode key in system.zones %v", key) + } + descID := descpb.ID(tree.MustBeDInt(startKeyRow[0].Datum)) + return descID, nil +} + +// TestingZonesDecoderDecodePrimaryKey constructs a zonesDecoder using the given +// codec and decodes the supplied key using it. This wrapper is exported for +// testing purposes to ensure the struct remains private. +func TestingZonesDecoderDecodePrimaryKey(codec keys.SQLCodec, key roachpb.Key) (descpb.ID, error) { + return newZonesDecoder(codec).DecodePrimaryKey(key) +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go new file mode 100644 index 000000000000..3a009b072a16 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go @@ -0,0 +1,108 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" +) + +// TestZoneDecoderDecodePrimaryKey verifies that we can decode the primary key +// stored in a system.zones like table. +func TestZonesDecoderDecodePrimaryKey(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Create a dummy table, like system.zones, to modify in this test. This lets + // us test things without bother with the prepoulated contents for + // system.zones. + // + // Note that system.zones has two column families (for legacy) reasons, but + // the table dummy table constructed below does not. This shouldn't matter + // as the decoder is only decoding the primary key in this test, which doesn't + // change across different column families. + const dummyTableName = "dummy_zones" + sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.zones INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + sqlDB.QueryRow( + t, + fmt.Sprintf("SELECT id FROM system.namespace WHERE name='%s'", dummyTableName), + ).Scan(&dummyTableID) + + k := keys.SystemSQLCodec.TablePrefix(dummyTableID) + + entries := []struct { + id descpb.ID + proto zonepb.ZoneConfig + }{ + { + id: 50, + proto: zonepb.ZoneConfig{}, + }, + { + id: 55, + proto: zonepb.DefaultZoneConfig(), + }, + { + id: 60, + proto: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(5), + }, + }, + } + + for _, entry := range entries { + buf, err := protoutil.Marshal(&entry.proto) + require.NoError(t, err) + + _ = sqlDB.Exec( + t, fmt.Sprintf("UPSERT INTO %s (id, config) VALUES ($1, $2) ", dummyTableName), entry.id, buf, + ) + require.NoError(t, err) + } + + rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + require.Equal(t, len(entries), len(rows)) + + for i, row := range rows { + got, err := spanconfigsqlwatcher.TestingZonesDecoderDecodePrimaryKey(keys.SystemSQLCodec, row.Key) + require.NoError(t, err) + require.Equal(t, entries[i].id, got) + } +} diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index acde9d9f34b6..38b9489c02e1 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -53,6 +53,10 @@ type TestingKnobs struct { // StoreKVSubscriberOverride is used to override the KVSubscriber used when // setting up a new store. StoreKVSubscriberOverride KVSubscriber + + // SQLWatcherOnEventInterceptor, if set, is invoked when the SQLWatcher + // receives an event on one of its rangefeeds. + SQLWatcherOnEventInterceptor func() error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 8aca2cb17a41..5bc6701c3a16 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -122,6 +122,10 @@ type TestServerInterface interface { // an interface{}. SpanConfigSQLTranslator() interface{} + // SpanConfigSQLWatcherFactory returns the underlying + // spanconfig.SQLWatcherFactory as an interface{}. + SpanConfigSQLWatcherFactory() interface{} + // SQLServer returns the *sql.Server as an interface{}. SQLServer() interface{}