From 24465b8c3fa7eadc4dfb9f5e0a1c29a9ecf96b2e Mon Sep 17 00:00:00 2001 From: arulajmani Date: Tue, 31 Aug 2021 14:18:31 -0400 Subject: [PATCH] spanconfig: introduce the spanconfig.SQLWatcher This patch introduces the SQLWatcher, which is intended to incrementally watch for updates to system.zones and system.descriptors. It does so by establishing rangefeeds at a given timestamp. The SQLWatcher invokes a callback from time to time with a list of updates that have been observed in the window (previous checkpointTS, checkpointTS]. The checkpointTS is also provided to the callback. Internally, the SQLWatcher uses a buffer to keep track of events generated by the SQLWatcher's rangefeeds. It also tracks the individual frontier timestamps of both the rangefeeds. This helps to maintain the notion of the combined frontier timestamp, which is computed as the minimum of the two. This combined frontier timestamp serves as the checkpoint to the SQLWatcher's callback function. This interface isn't hooked up to anything yet. It'll be used by the sponconfig.Reconciler soon to perform partial reconciliation once full reconciliation is done. It is intended that the IDs from the updates produced by the SQLWatcher will be fed into the SQLTranslator. References #67679 Carved from #69661 Release note: None --- pkg/BUILD.bazel | 1 + .../changefeedccl/helpers_tenant_shim_test.go | 53 +-- .../rangefeed/rangefeedbuffer/buffer.go | 2 +- .../rangefeed/rangefeedbuffer/buffer_test.go | 26 +- pkg/server/BUILD.bazel | 1 + pkg/server/server_sql.go | 10 + pkg/server/testserver.go | 10 + pkg/spanconfig/BUILD.bazel | 1 + pkg/spanconfig/spanconfig.go | 63 ++- .../spanconfigkvsubscriber/kvsubscriber.go | 2 +- pkg/spanconfig/spanconfigmanager/manager.go | 19 +- .../spanconfigmanager/manager_test.go | 3 + .../spanconfigsqlwatcher/BUILD.bazel | 67 +++ pkg/spanconfig/spanconfigsqlwatcher/buffer.go | 159 ++++++++ .../spanconfigsqlwatcher/buffer_test.go | 235 +++++++++++ .../spanconfigsqlwatcher/main_test.go | 31 ++ .../spanconfigsqlwatcher/sqlwatcher.go | 335 +++++++++++++++ .../spanconfigsqlwatcher/sqlwatcher_test.go | 382 ++++++++++++++++++ .../spanconfigsqlwatcher/zonesdecoder.go | 63 +++ .../spanconfigsqlwatcher/zonesdecoder_test.go | 108 +++++ pkg/spanconfig/testing_knobs.go | 4 + pkg/testutils/serverutils/test_server_shim.go | 4 + 22 files changed, 1523 insertions(+), 56 deletions(-) create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/buffer.go create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/main_test.go create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go create mode 100644 pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 5e4a61d178d1..42bbe62d16aa 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -185,6 +185,7 @@ ALL_TESTS = [ "//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", "//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test", + "//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test", "//pkg/spanconfig/spanconfigstore:spanconfigstore_test", "//pkg/spanconfig/spanconfigtestutils:spanconfigtestutils_test", "//pkg/sql/catalog/catalogkeys:catalogkeys_test", diff --git a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go index 5f8617371a26..3ce3711ef227 100644 --- a/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go +++ b/pkg/ccl/changefeedccl/helpers_tenant_shim_test.go @@ -51,32 +51,33 @@ func (t *testServerShim) ServingSQLAddr() string { return t.SQLAddr() } -func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) } -func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) } -func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) } -func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) } -func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) } -func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) } -func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) } -func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) } -func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) } -func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) } -func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) } -func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) Stopper() *stop.Stopper { panic(unsupportedShimMethod) } +func (t *testServerShim) Start(context.Context) error { panic(unsupportedShimMethod) } +func (t *testServerShim) Node() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) NodeID() roachpb.NodeID { panic(unsupportedShimMethod) } +func (t *testServerShim) ClusterID() uuid.UUID { panic(unsupportedShimMethod) } +func (t *testServerShim) ServingRPCAddr() string { panic(unsupportedShimMethod) } +func (t *testServerShim) RPCAddr() string { panic(unsupportedShimMethod) } +func (t *testServerShim) DB() *kv.DB { panic(unsupportedShimMethod) } +func (t *testServerShim) RPCContext() *rpc.Context { panic(unsupportedShimMethod) } +func (t *testServerShim) LeaseManager() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) InternalExecutor() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) ExecutorConfig() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) TracerI() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) GossipI() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) RangeFeedFactory() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) Clock() *hlc.Clock { panic(unsupportedShimMethod) } +func (t *testServerShim) DistSenderI() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) MigrationServer() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SpanConfigAccessor() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SpanConfigSQLTranslator() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SpanConfigSQLWatcherFactory() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SQLServer() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) SQLLivenessProvider() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) StartupMigrationsManager() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) NodeLiveness() interface{} { panic(unsupportedShimMethod) } +func (t *testServerShim) HeartbeatNodeLiveness() error { panic(unsupportedShimMethod) } +func (t *testServerShim) NodeDialer() interface{} { panic(unsupportedShimMethod) } func (t *testServerShim) SetDistSQLSpanResolver(spanResolver interface{}) { panic(unsupportedShimMethod) } diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go index bc54e4b9d2b0..86045b957cc2 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go @@ -50,7 +50,7 @@ func New(limit int) *Buffer { } // Add adds the given entry to the buffer. -func (b *Buffer) Add(ctx context.Context, ev Event) error { +func (b *Buffer) Add(ev Event) error { b.mu.Lock() defer b.mu.Unlock() diff --git a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go index f729a96705e3..5188beae8e36 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer_test.go @@ -48,11 +48,11 @@ func TestBuffer(t *testing.T) { } { // Flushing at a timestamp lower than buffered events should return nothing. - require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13 - require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11 - require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15 - require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12 - require.NoError(t, buffer.Add(ctx, makeEvent("e", ts(18)))) // e@18 + require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13 + require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11 + require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15 + require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12 + require.NoError(t, buffer.Add(makeEvent("e", ts(18)))) // e@18 events := buffer.Flush(ctx, ts(10)) require.True(t, len(events) == 0) @@ -75,18 +75,18 @@ func TestBuffer(t *testing.T) { } { // Adding events with timestamps <= the last flush are discarded. - require.NoError(t, buffer.Add(ctx, makeEvent("a", ts(13)))) // a@13 - require.NoError(t, buffer.Add(ctx, makeEvent("b", ts(11)))) // b@11 - require.NoError(t, buffer.Add(ctx, makeEvent("c", ts(15)))) // c@15 - require.NoError(t, buffer.Add(ctx, makeEvent("d", ts(12)))) // d@12 + require.NoError(t, buffer.Add(makeEvent("a", ts(13)))) // a@13 + require.NoError(t, buffer.Add(makeEvent("b", ts(11)))) // b@11 + require.NoError(t, buffer.Add(makeEvent("c", ts(15)))) // c@15 + require.NoError(t, buffer.Add(makeEvent("d", ts(12)))) // d@12 events := buffer.Flush(ctx, ts(15)) require.True(t, len(events) == 0) } { // Additional events are flushed out at appropriate points. - require.NoError(t, buffer.Add(ctx, makeEvent("f", ts(19)))) // f@19 - require.NoError(t, buffer.Add(ctx, makeEvent("g", ts(21)))) // g@21 + require.NoError(t, buffer.Add(makeEvent("f", ts(19)))) // f@19 + require.NoError(t, buffer.Add(makeEvent("g", ts(21)))) // g@21 events := buffer.Flush(ctx, ts(20)) require.True(t, len(events) == 2) @@ -107,10 +107,10 @@ func TestBuffer(t *testing.T) { { // Ensure that buffer limits are respected. for i := 0; i < limit; i++ { - require.NoError(t, buffer.Add(ctx, makeEvent("x", ts(101)))) // x@101 + require.NoError(t, buffer.Add(makeEvent("x", ts(101)))) // x@101 } - err := buffer.Add(ctx, makeEvent("x", ts(101))) + err := buffer.Add(makeEvent("x", ts(101))) require.ErrorIs(t, err, rangefeedbuffer.ErrBufferLimitExceeded) } } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 80ac4fd11f22..1c244a07c225 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -116,6 +116,7 @@ go_library( "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/spanconfig/spanconfigmanager", "//pkg/spanconfig/spanconfigsqltranslator", + "//pkg/spanconfig/spanconfigsqlwatcher", "//pkg/sql", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index f3177deb2eee..9ed9363567ae 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -53,6 +53,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigmanager" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqltranslator" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -842,6 +843,14 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // only do it if COCKROACH_EXPERIMENTAL_SPAN_CONFIGS is set. spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) sqlTranslator := spanconfigsqltranslator.New(execCfg, codec) + sqlWatcherFactory := spanconfigsqlwatcher.NewFactory( + codec, + cfg.Settings, + cfg.rangeFeedFactory, + 1<<20, /* 1 MB bufferMemLimit */ + cfg.stopper, + spanConfigKnobs, + ) spanConfigMgr = spanconfigmanager.New( cfg.db, jobRegistry, @@ -849,6 +858,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.stopper, cfg.Settings, cfg.spanConfigAccessor, + sqlWatcherFactory, sqlTranslator, spanConfigKnobs, ) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 9b71c7d6f5ee..03ea76d8c832 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -961,6 +961,16 @@ func (ts *TestServer) SpanConfigSQLTranslator() interface{} { return ts.sqlServer.spanconfigMgr.SQLTranslator } +// SpanConfigSQLWatcherFactory is part of TestServerInterface. +func (ts *TestServer) SpanConfigSQLWatcherFactory() interface{} { + if ts.sqlServer.spanconfigMgr == nil { + panic( + "span config manager uninitialized; see EnableSpanConfigs testing knob to use span configs", + ) + } + return ts.sqlServer.spanconfigMgr.SQLWatcherFactory +} + // SQLServer is part of TestServerInterface. func (ts *TestServer) SQLServer() interface{} { return ts.PGServer().SQLServer diff --git a/pkg/spanconfig/BUILD.bazel b/pkg/spanconfig/BUILD.bazel index 6f3d8fc5a0b0..b85ee3515697 100644 --- a/pkg/spanconfig/BUILD.bazel +++ b/pkg/spanconfig/BUILD.bazel @@ -12,6 +12,7 @@ go_library( "//pkg/base", "//pkg/keys", "//pkg/roachpb:with-mocks", + "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/util/hlc", ], diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 97e4c6dc7f72..469ca83a56aa 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -117,6 +118,59 @@ func FullTranslate( return s.Translate(ctx, descpb.IDs{keys.RootNamespaceID}) } +// SQLWatcher watches for events on system.zones and system.descriptors. +type SQLWatcher interface { + // WatchForSQLUpdates watches for updates to zones and descriptors starting at + // the given timestamp (exclusive), informing callers using the handler + // callback. + // + // The handler callback[1] is invoked from time to time with a list of updates + // and a checkpointTS. Invocations of the handler callback provide the + // following semantics: + // 1. Calls to the handler are serial. + // 2. The timestamp supplied to the handler is monotonically increasing. + // 3. The list of DescriptorUpdates supplied to handler includes all events + // in the window (prevInvocationCheckpointTS, checkpointTS]. + // 4. No further calls to the handler are made if a call to the handler + // returns an error. + // + // These guarantees mean that users of this interface are free to persist the + // checkpointTS and later use it to re-establish a SQLWatcher without missing + // any updates. + // + // WatchForSQLUpdates can only ever be called once, effectively making the + // SQLWatcher a single use interface. + // + // WatchForSQLUpdates may run out of memory and return an error if it is + // tracking too many events between two checkpoints. + // + // [1] Users of this interface should not intend to do expensive work in the + // handler callback. + // TODO(arul): Possibly get rid of this limitation. + WatchForSQLUpdates( + ctx context.Context, + startTS hlc.Timestamp, + handler func(ctx context.Context, updates []DescriptorUpdate, checkpointTS hlc.Timestamp) error, + ) error +} + +// DescriptorUpdate captures the ID and type of a descriptor or zone that the +// SQLWatcher has observed updated. +type DescriptorUpdate struct { + // ID of the descriptor/zone that has been updated. + ID descpb.ID + + // DescriptorType of the descriptor/zone that has been updated. Could be either + // the specific type or catalog.Any if no information is available. + DescriptorType catalog.DescriptorType +} + +// SQLWatcherFactory is used to construct new SQLWatchers. +type SQLWatcherFactory interface { + // New returns a new SQLWatcher. + New() SQLWatcher +} + // ReconciliationDependencies captures what's needed by the span config // reconciliation job to perform its task. The job is responsible for // reconciling a tenant's zone configurations with the clusters span @@ -126,11 +180,7 @@ type ReconciliationDependencies interface { SQLTranslator - // TODO(arul): We'll also want access to a "SQLWatcher", something that - // watches for changes to system.{descriptors, zones} to feed IDs to the - // SQLTranslator. These interfaces will be used by the "Reconciler to perform - // full/partial reconciliation, checkpoint the span config job, and update KV - // with the tenants span config state. + SQLWatcherFactory } // Store is a data structure used to store spans and their corresponding @@ -235,8 +285,7 @@ func (u Update) Deletion() bool { return u.Config.IsEmpty() } -// Addition returns true if the update corresponds to a span config being -// added. +// Addition returns true if the update corresponds to a span config being added. func (u Update) Addition() bool { return !u.Deletion() } diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go index 77fca41a25c1..94405b6fa495 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -260,7 +260,7 @@ func (s *KVSubscriber) run(ctx context.Context) error { update.Config = entry.Config } - if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil { + if err := buffer.Add(&bufferEvent{update, ev.Value.Timestamp}); err != nil { select { case <-ctx.Done(): // The context is canceled when the rangefeed is closed by the diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index e4130f722d14..010fe2e8d9de 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -60,6 +60,7 @@ type Manager struct { knobs *spanconfig.TestingKnobs spanconfig.KVAccessor + spanconfig.SQLWatcherFactory spanconfig.SQLTranslator } @@ -73,6 +74,7 @@ func New( stopper *stop.Stopper, settings *cluster.Settings, kvAccessor spanconfig.KVAccessor, + sqlWatcherFactory spanconfig.SQLWatcherFactory, sqlTranslator spanconfig.SQLTranslator, knobs *spanconfig.TestingKnobs, ) *Manager { @@ -80,14 +82,15 @@ func New( knobs = &spanconfig.TestingKnobs{} } return &Manager{ - db: db, - jr: jr, - ie: ie, - stopper: stopper, - settings: settings, - KVAccessor: kvAccessor, - SQLTranslator: sqlTranslator, - knobs: knobs, + db: db, + jr: jr, + ie: ie, + stopper: stopper, + settings: settings, + KVAccessor: kvAccessor, + SQLWatcherFactory: sqlWatcherFactory, + SQLTranslator: sqlTranslator, + knobs: knobs, } } diff --git a/pkg/spanconfig/spanconfigmanager/manager_test.go b/pkg/spanconfig/spanconfigmanager/manager_test.go index a8b0a7bd7ab2..067befba1464 100644 --- a/pkg/spanconfig/spanconfigmanager/manager_test.go +++ b/pkg/spanconfig/spanconfigmanager/manager_test.go @@ -74,6 +74,7 @@ func TestManagerConcurrentJobCreation(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory), ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), &spanconfig.TestingKnobs{ ManagerCreatedJobInterceptor: func(jobI interface{}) { @@ -162,6 +163,7 @@ func TestManagerStartsJobIfFailed(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory), ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), &spanconfig.TestingKnobs{ ManagerAfterCheckedReconciliationJobExistsInterceptor: func(exists bool) { @@ -237,6 +239,7 @@ func TestManagerCheckJobConditions(t *testing.T) { ts.Stopper(), ts.ClusterSettings(), ts.SpanConfigAccessor().(spanconfig.KVAccessor), + ts.SpanConfigSQLWatcherFactory().(spanconfig.SQLWatcherFactory), ts.SpanConfigSQLTranslator().(spanconfig.SQLTranslator), &spanconfig.TestingKnobs{ ManagerDisableJobCreation: true, diff --git a/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel new file mode 100644 index 000000000000..bb47eba6a7c7 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel @@ -0,0 +1,67 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "spanconfigsqlwatcher", + srcs = [ + "buffer.go", + "sqlwatcher.go", + "zonesdecoder.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher", + visibility = ["//visibility:public"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/kv/kvclient/rangefeed/rangefeedbuffer", + "//pkg/roachpb:with-mocks", + "//pkg/settings/cluster", + "//pkg/spanconfig", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/systemschema", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", + "//pkg/sql/types", + "//pkg/util/hlc", + "//pkg/util/log", + "//pkg/util/log/logcrash", + "//pkg/util/stop", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "spanconfigsqlwatcher_test", + srcs = [ + "buffer_test.go", + "main_test.go", + "sqlwatcher_test.go", + "zonesdecoder_test.go", + ], + embed = [":spanconfigsqlwatcher"], + deps = [ + "//pkg/base", + "//pkg/config/zonepb", + "//pkg/jobs", + "//pkg/keys", + "//pkg/kv/kvclient/rangefeed:with-mocks", + "//pkg/security", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/spanconfig", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/protoutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_gogo_protobuf//proto", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go new file mode 100644 index 000000000000..8a50ecd2cba1 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer.go @@ -0,0 +1,159 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "context" + "sort" + + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" +) + +// buffer is a helper struct for the SQLWatcher. It buffers events generated by +// the SQLWatcher's rangefeeds over system.zones and system.descriptors. It is +// safe for concurrent use. +// +// The buffer tracks frontier timestamps for both these rangefeeds as well. It +// maintains the notion of the combined frontier timestamp computed as the +// minimum of the two. This is used when flushing the buffer periodically. +type buffer struct { + mu struct { + syncutil.Mutex + + // rangefeed.Buffer stores spanconfigsqlwatcher.Events. + buffer *rangefeedbuffer.Buffer + + // rangefeedFrontiers tracks the frontier timestamps of individual + // rangefeeds established by the SQLWatcher. + rangefeedFrontiers [numRangefeeds]hlc.Timestamp + } +} + +// event is the unit produced by the rangefeeds the SQLWatcher establishes over +// system.zones and system.descriptors. It implements the rangefeedbuffer.Event +// interface. +type event struct { + // timestamp at which the event was generated by the rangefeed. + timestamp hlc.Timestamp + + // update captures information about the descriptor or zone that the + // SQLWatcher has observed change. + update spanconfig.DescriptorUpdate +} + +// Timestamp implements the rangefeedbuffer.Event interface. +func (e event) Timestamp() hlc.Timestamp { + return e.timestamp +} + +// rangefeedKind is used to identify the distinct rangefeeds {descriptors, +// zones} established by the SQLWatcher. +type rangefeedKind int + +const ( + zonesRangefeed rangefeedKind = iota + descriptorsRangefeed + + // numRangefeeds should be listed last. + numRangefeeds int = iota +) + +// newBuffer constructs and returns a new buffer. +func newBuffer(limit int) *buffer { + rangefeedBuffer := rangefeedbuffer.New(limit) + eventBuffer := &buffer{} + eventBuffer.mu.buffer = rangefeedBuffer + return eventBuffer +} + +// advance advances the frontier for the given rangefeed. +func (b *buffer) advance(rangefeed rangefeedKind, timestamp hlc.Timestamp) { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.rangefeedFrontiers[rangefeed].Forward(timestamp) +} + +// add records the given event in the buffer. +func (b *buffer) add(ev event) error { + b.mu.Lock() + defer b.mu.Unlock() + return b.mu.buffer.Add(ev) +} + +// flushEvents computes the combined frontier timestamp of the buffer and +// returns a list of relevant events which were buffered up to that timestamp. +func (b *buffer) flushEvents( + ctx context.Context, +) (updates []rangefeedbuffer.Event, combinedFrontierTS hlc.Timestamp) { + b.mu.Lock() + defer b.mu.Unlock() + // First we determine the checkpoint timestamp, which is the minimum + // checkpoint timestamp of all event types. + combinedFrontierTS = hlc.MaxTimestamp + for _, ts := range b.mu.rangefeedFrontiers { + combinedFrontierTS.Backward(ts) + } + + return b.mu.buffer.Flush(ctx, combinedFrontierTS), combinedFrontierTS +} + +// flush computes the combined frontier timestamp of the buffer and returns a +// list of unique spanconfig.DescriptorUpdates below this timestamp. The +// combined frontier timestamp is also returned. +func (b *buffer) flush( + ctx context.Context, +) (updates []spanconfig.DescriptorUpdate, _ hlc.Timestamp, _ error) { + events, combinedFrontierTS := b.flushEvents(ctx) + sort.Slice(events, func(i, j int) bool { + ei, ej := events[i].(event), events[j].(event) + if ei.update.ID == ej.update.ID { + return ei.timestamp.Less(ej.timestamp) + } + return ei.update.ID < ej.update.ID + }) + for i, ev := range events { + if i == 0 || events[i-1].(event).update.ID != ev.(event).update.ID { + updates = append(updates, ev.(event).update) + continue + } + descType, err := combine(updates[len(updates)-1].DescriptorType, ev.(event).update.DescriptorType) + if err != nil { + return nil, hlc.Timestamp{}, err + } + updates[len(updates)-1].DescriptorType = descType + } + return updates, combinedFrontierTS, nil +} + +// combine takes two catalog.DescriptorTypes and combines them according to the +// following semantics: +// - Any can combine with any concrete descriptor type (including itself). +// Concrete descriptor types are {Table,Database,Schema,Type} descriptor types. +// - Concrete descriptor types can combine with themselves. +// - A concrete descriptor type cannot combine with another concrete descriptor +// type. +func combine(d1 catalog.DescriptorType, d2 catalog.DescriptorType) (catalog.DescriptorType, error) { + if d1 == d2 { + return d1, nil + } + if d1 == catalog.Any { + return d2, nil + } + if d2 == catalog.Any { + return d1, nil + } + return catalog.Any, errors.AssertionFailedf("cannot combine %s and %s", d1, d2) +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go b/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go new file mode 100644 index 000000000000..1005940394b7 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/buffer_test.go @@ -0,0 +1,235 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +// TestBuffer adds DescriptorUpdate events to the buffer and ensures that +// checkpointing and ID flushing semantics work correctly. +func TestBuffer(t *testing.T) { + defer leaktest.AfterTest(t)() + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: int64(nanos), + } + } + makeEvent := func(descID int, timestamp hlc.Timestamp) event { + return event{ + update: spanconfig.DescriptorUpdate{ + ID: descpb.ID(descID), + DescriptorType: catalog.Any, + }, + timestamp: timestamp, + } + } + makeUpdates := func(descIDs ...int) []spanconfig.DescriptorUpdate { + updates := make([]spanconfig.DescriptorUpdate, 0, len(descIDs)) + for _, descID := range descIDs { + updates = append(updates, spanconfig.DescriptorUpdate{ + ID: descpb.ID(descID), + DescriptorType: catalog.Any, + }) + } + return updates + } + + ctx := context.Background() + buffer := newBuffer(10 /* limit */) + + // Sanity check the newly initialized event buffer. + updates, combinedFrontierTS, err := buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(0), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Add a few events without advancing any of the frontiers. We don't expect + // anything to be returned by the call to flush yet. + err = buffer.add(makeEvent(1, ts(10))) + require.NoError(t, err) + + err = buffer.add(makeEvent(2, ts(11))) + require.NoError(t, err) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(0), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Advance the zones frontier. We expect flush to still not return any results + // as the descriptors frontier hasn't been advanced yet. + buffer.advance(zonesRangefeed, ts(11)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(0), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Advance the descriptors frontier to a lower timestamp than the zones + // frontier above. Flush should now return the lower timestamp as the + // combinedFrontierTS. Furthermore, we only expect one id to be returned. + buffer.advance(descriptorsRangefeed, ts(10)) + require.NoError(t, err) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(10), combinedFrontierTS) + require.Equal(t, makeUpdates(1), updates) + + // Bump the descriptors frontier past the zones frontier. This should bump the + // combinedFrontierTS of the buffer to 11, resulting in flush returning the + // last ID in the buffer. + buffer.advance(descriptorsRangefeed, ts(20)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.Equal(t, makeUpdates(2), updates) + + // No updates are left in the buffer below the combined frontier (which hasn't + // changed from above). + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Try regressing the zones frontier by advancing it to a timestamp below what + // it was previously advanced to (11). This should essentially no-op, which we + // can check by looking at the combinedFrontierTS from the call to flush. + buffer.advance(zonesRangefeed, ts(5)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Try adding an event at a TS below the combinedFrontierTS. This should no-op + // as well and the ID should not be returned when flushing. + err = buffer.add(makeEvent(1, ts(5))) + require.NoError(t, err) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(11), combinedFrontierTS) + require.True(t, len(updates) == 0) + + // Lastly, ensure that flushing doesn't return duplicate IDs even when more + // than one updates have been added for a given ID. + err = buffer.add(makeEvent(1, ts(12))) + require.NoError(t, err) + err = buffer.add(makeEvent(1, ts(13))) + require.NoError(t, err) + err = buffer.add(makeEvent(2, ts(14))) + require.NoError(t, err) + buffer.advance(zonesRangefeed, ts(14)) + updates, combinedFrontierTS, err = buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(14), combinedFrontierTS) + require.Equal(t, makeUpdates(1, 2), updates) +} + +// TestBufferCombinesDescriptorTypes ensures that the descriptor type of the +// same ID is combined correctly when flushing. It also ensures that the +// semantics around flushing are sound, in particular: +// - Type spanconfig.Any can combine with any concrete descriptor type +// (and itself). +// - A concrete descriptor type can combine with itself. +// - A concrete descriptor type cannot combine with a different concrete +// descriptor type. +func TestBufferCombinesDescriptorTypes(t *testing.T) { + defer leaktest.AfterTest(t)() + + buffer := newBuffer(10 /* limit */) + ctx := context.Background() + + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: int64(nanos), + } + } + makeEvent := func(descID int, descType catalog.DescriptorType, timestamp hlc.Timestamp) event { + return event{ + update: spanconfig.DescriptorUpdate{ + ID: descpb.ID(descID), + DescriptorType: descType, + }, + timestamp: timestamp, + } + } + advanceRangefeeds := func(timestamp hlc.Timestamp) { + buffer.advance(descriptorsRangefeed, timestamp) + buffer.advance(zonesRangefeed, timestamp) + } + makeUpdates := func(descID int, descType catalog.DescriptorType) []spanconfig.DescriptorUpdate { + return []spanconfig.DescriptorUpdate{ + { + ID: descpb.ID(descID), + DescriptorType: descType, + }, + } + } + + concreteDescriptorTypes := []catalog.DescriptorType{ + catalog.Table, + catalog.Database, + catalog.Schema, + catalog.Type, + } + + nanos := 1 + // Test 3 events on the same ID, with a concrete descriptor event + // (eg. catalog.Table) flanked by catalog.Any on either side. This + // should all combine to the concrete descriptor type. + { + for _, descType := range concreteDescriptorTypes { + err := buffer.add(makeEvent(1, catalog.Any, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, descType, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, catalog.Any, ts(nanos))) + require.NoError(t, err) + nanos++ + advanceRangefeeds(ts(nanos)) + + updates, combinedFrontierTS, err := buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(nanos), combinedFrontierTS) + require.Equal(t, updates, makeUpdates(1, descType)) + } + } + + // Test all descriptor types (including spanconfig.Any) combine with each + // other. + { + for _, descType := range append(concreteDescriptorTypes, catalog.Any) { + err := buffer.add(makeEvent(1, descType, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, descType, ts(nanos))) + require.NoError(t, err) + nanos++ + err = buffer.add(makeEvent(1, catalog.Any, ts(nanos))) + require.NoError(t, err) + nanos++ + advanceRangefeeds(ts(nanos)) + + updates, combinedFrontierTS, err := buffer.flush(ctx) + require.NoError(t, err) + require.Equal(t, ts(nanos), combinedFrontierTS) + require.Equal(t, updates, makeUpdates(1, descType)) + } + } +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/main_test.go b/pkg/spanconfig/spanconfigsqlwatcher/main_test.go new file mode 100644 index 000000000000..df87ac32c5f4 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/main_test.go @@ -0,0 +1,31 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" +) + +func TestMain(m *testing.M) { + security.SetAssetLoader(securitytest.EmbeddedAssets) + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go new file mode 100644 index 000000000000..9026269e89b4 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go @@ -0,0 +1,335 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "context" + "sync/atomic" + "unsafe" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/errors" +) + +// Factory implements the spanconfig.SQLWatcherFactory interface. +var _ spanconfig.SQLWatcherFactory = &Factory{} + +// SQLWatcher implements the spanconfig.SQLWatcher interface. +var _ spanconfig.SQLWatcher = &SQLWatcher{} + +// Factory is used to construct spanconfig.SQLWatcher interfaces. +type Factory struct { + codec keys.SQLCodec + settings *cluster.Settings + stopper *stop.Stopper + knobs *spanconfig.TestingKnobs + rangeFeedFactory *rangefeed.Factory + bufferMemLimit int64 +} + +// NewFactory constructs a new Factory. +func NewFactory( + codec keys.SQLCodec, + settings *cluster.Settings, + rangeFeedFactory *rangefeed.Factory, + bufferMemLimit int64, + stopper *stop.Stopper, + knobs *spanconfig.TestingKnobs, +) *Factory { + if knobs == nil { + knobs = &spanconfig.TestingKnobs{} + } + return &Factory{ + codec: codec, + settings: settings, + rangeFeedFactory: rangeFeedFactory, + stopper: stopper, + bufferMemLimit: bufferMemLimit, + knobs: knobs, + } +} + +// SQLWatcher is the concrete implementation of spanconfig.SQLWatcher. It +// establishes rangefeeds over system.zones and system.descriptors to +// incrementally watch for SQL updates. +type SQLWatcher struct { + codec keys.SQLCodec + settings *cluster.Settings + rangeFeedFactory *rangefeed.Factory + stopper *stop.Stopper + + buffer *buffer + + knobs *spanconfig.TestingKnobs + + started int32 // accessed atomically. +} + +// sqlWatcherBufferEntrySize is the size of an entry stored in the sqlWatcher's +// buffer. We use this value to calculate the buffer capacity. +const sqlWatcherBufferEntrySize = int64(unsafe.Sizeof(event{}) + unsafe.Sizeof(rangefeedbuffer.Event(nil))) + +// New constructs a spanconfig.SQLWatcher. +func (f *Factory) New() spanconfig.SQLWatcher { + return &SQLWatcher{ + codec: f.codec, + settings: f.settings, + rangeFeedFactory: f.rangeFeedFactory, + stopper: f.stopper, + buffer: newBuffer(int(f.bufferMemLimit / sqlWatcherBufferEntrySize)), + knobs: f.knobs, + } +} + +// WatchForSQLUpdates is part of the spanconfig.SQLWatcher interface. +func (s *SQLWatcher) WatchForSQLUpdates( + ctx context.Context, + timestamp hlc.Timestamp, + handler func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error, +) error { + if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { + return errors.AssertionFailedf("watcher already started watching") + } + + // The callbacks below are invoked by both the rangefeeds we establish, both + // of which run on separate goroutines. To ensure calls to the handler + // function are serial we only ever run it on the main thread of + // WatchForSQLUpdates (instead of pushing it into the rangefeed callbacks). + // The rangefeed callbacks use channels to report errors and notifications to + // to flush events from the buffer. As WatchForSQLUpdate's main thread is the + // sole listener on these channels, doing expensive work in the handler + // function can lead to blocking the rangefeed, which isn't great. This is an + // unfortunate asterisk for users of this interface to be aware of. + // + // TODO(arul): Possibly get rid of this limitation by introducing another + // buffer interface here to store updates produced by the Watcher so that + // we can run the handler in a separate goroutine and still provide the + // serial semantics. + errCh := make(chan error) + frontierAdvanced := make(chan struct{}) + onFrontierAdvance := func(ctx context.Context, rangefeed rangefeedKind, timestamp hlc.Timestamp) { + s.buffer.advance(rangefeed, timestamp) + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is being closed, which + // happens after we've stopped listening on the frontierAdvancedCh. + case frontierAdvanced <- struct{}{}: + } + } + onEvent := func(ctx context.Context, event event) { + err := func() error { + if fn := s.knobs.SQLWatcherOnEventInterceptor; fn != nil { + if err := fn(); err != nil { + return err + } + } + return s.buffer.add(event) + }() + if err != nil { + log.Warningf(ctx, "error adding event %v: %v", event, err) + select { + case <-ctx.Done(): + // The context is canceled when the rangefeed is being closed, which + // happens after we've stopped listening on the errCh. + case errCh <- err: + } + } + } + + descriptorsRF, err := s.watchForDescriptorUpdates(ctx, timestamp, onEvent, onFrontierAdvance) + if err != nil { + return errors.Wrapf(err, "error establishing rangefeed over system.descriptors") + } + defer descriptorsRF.Close() + zonesRF, err := s.watchForZoneConfigUpdates(ctx, timestamp, onEvent, onFrontierAdvance) + if err != nil { + return errors.Wrapf(err, "error establishing rangefeed over system.zones") + } + defer zonesRF.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-s.stopper.ShouldQuiesce(): + return nil + case err = <-errCh: + return err + case <-frontierAdvanced: + events, combinedFrontierTS, err := s.buffer.flush(ctx) + if err != nil { + return err + } + if len(events) == 0 { + continue + } + if err := handler(ctx, events, combinedFrontierTS); err != nil { + return err + } + } + } +} + +// watchForDescriptorUpdates establishes a rangefeed over system.descriptors and +// invokes the onEvent callback for observed events. The onFrontierAdvance +// callback is invoked whenever the rangefeed frontier is advanced as well. +func (s *SQLWatcher) watchForDescriptorUpdates( + ctx context.Context, + timestamp hlc.Timestamp, + onEvent func(context.Context, event), + onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp), +) (*rangefeed.RangeFeed, error) { + descriptorTableStart := s.codec.TablePrefix(keys.DescriptorTableID) + descriptorTableSpan := roachpb.Span{ + Key: descriptorTableStart, + EndKey: descriptorTableStart.PrefixEnd(), + } + handleEvent := func(ctx context.Context, ev *roachpb.RangeFeedValue) { + if !ev.Value.IsPresent() && !ev.PrevValue.IsPresent() { + // Event for a tombstone on a tombstone -- nothing for us to do here. + return + } + value := ev.Value + if !ev.Value.IsPresent() { + // The descriptor was deleted. + value = ev.PrevValue + } + + var descriptor descpb.Descriptor + if err := value.GetProto(&descriptor); err != nil { + logcrash.ReportOrPanic( + ctx, + &s.settings.SV, + "%s: failed to unmarshal descriptor %v", + ev.Key, + value, + ) + return + } + if descriptor.Union == nil { + return + } + + table, database, typ, schema := descpb.FromDescriptorWithMVCCTimestamp(&descriptor, value.Timestamp) + + var id descpb.ID + var descType catalog.DescriptorType + switch { + case table != nil: + id = table.GetID() + descType = catalog.Table + case database != nil: + id = database.GetID() + descType = catalog.Database + case typ != nil: + id = typ.GetID() + descType = catalog.Type + case schema != nil: + id = schema.GetID() + descType = catalog.Schema + default: + logcrash.ReportOrPanic(ctx, &s.settings.SV, "unknown descriptor unmarshalled %v", descriptor) + } + + rangefeedEvent := event{ + timestamp: ev.Value.Timestamp, + update: spanconfig.DescriptorUpdate{ + ID: id, + DescriptorType: descType, + }, + } + onEvent(ctx, rangefeedEvent) + } + rf, err := s.rangeFeedFactory.RangeFeed( + ctx, + "sql-watcher-descriptor-rangefeed", + descriptorTableSpan, + timestamp, + handleEvent, + rangefeed.WithDiff(), + rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { + onFrontierAdvance(ctx, descriptorsRangefeed, resolvedTS) + }), + ) + if err != nil { + return nil, err + } + + log.Infof(ctx, "established range feed over system.descriptors table starting at time %s", timestamp) + return rf, nil +} + +// watchForZoneConfigUpdates establishes a rangefeed over system.zones and +// invokes the onEvent callback whenever an event is observed. The +// onFrontierAdvance callback is also invoked whenever the rangefeed frontier is +// advanced. +func (s *SQLWatcher) watchForZoneConfigUpdates( + ctx context.Context, + timestamp hlc.Timestamp, + onEvent func(context.Context, event), + onFrontierAdvance func(context.Context, rangefeedKind, hlc.Timestamp), +) (*rangefeed.RangeFeed, error) { + zoneTableStart := s.codec.TablePrefix(keys.ZonesTableID) + zoneTableSpan := roachpb.Span{ + Key: zoneTableStart, + EndKey: zoneTableStart.PrefixEnd(), + } + + decoder := newZonesDecoder(s.codec) + handleEvent := func(ctx context.Context, ev *roachpb.RangeFeedValue) { + descID, err := decoder.DecodePrimaryKey(ev.Key) + if err != nil { + logcrash.ReportOrPanic( + ctx, + &s.settings.SV, + "sql watcher zones range feed error: %v", + err, + ) + return + } + + rangefeedEvent := event{ + timestamp: ev.Value.Timestamp, + update: spanconfig.DescriptorUpdate{ + ID: descID, + DescriptorType: catalog.Any, + }, + } + onEvent(ctx, rangefeedEvent) + } + rf, err := s.rangeFeedFactory.RangeFeed( + ctx, + "sql-watcher-zones-rangefeed", + zoneTableSpan, + timestamp, + handleEvent, + rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { + onFrontierAdvance(ctx, zonesRangefeed, resolvedTS) + }), + ) + if err != nil { + return nil, err + } + + log.Infof(ctx, "established range feed over system.zones table starting at time %s", timestamp) + return rf, nil +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go new file mode 100644 index 000000000000..39e8dd9cd7f7 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher_test.go @@ -0,0 +1,382 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher_test + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestSQLWatcherReactsToUpdates verifies that the SQLWatcher emits the correct +// updates following changes made to system.descriptor or system.zones. It also +// ensures that the timestamp supplied to the handler function is monotonically +// increasing. +func TestSQLWatcherReactsToUpdates(t *testing.T) { + defer leaktest.AfterTest(t)() + + testCases := []struct { + setup string + stmt string + expectedIDs descpb.IDs + }{ + { + stmt: "CREATE TABLE t()", + expectedIDs: descpb.IDs{52}, + }, + { + setup: "CREATE TABLE t2()", + stmt: "ALTER TABLE t2 CONFIGURE ZONE USING num_replicas = 3", + expectedIDs: descpb.IDs{53}, + }, + { + setup: "CREATE DATABASE d; CREATE TABLE d.t1(); CREATE TABLE d.t2()", + stmt: "ALTER DATABASE d CONFIGURE ZONE USING num_replicas=5", + expectedIDs: descpb.IDs{54}, + }, + { + setup: "CREATE TABLE t3(); CREATE TABLE t4()", + stmt: "ALTER TABLE t3 CONFIGURE ZONE USING num_replicas=5; CREATE TABLE t5(); DROP TABLE t4;", + expectedIDs: descpb.IDs{57, 58, 59}, + }, + // Named zone tests. + { + stmt: "ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.RootNamespaceID}, + }, + { + stmt: "ALTER RANGE liveness CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.LivenessRangesID}, + }, + { + stmt: "ALTER RANGE meta CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.MetaRangesID}, + }, + { + stmt: "ALTER RANGE system CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.SystemRangesID}, + }, + { + stmt: "ALTER RANGE timeseries CONFIGURE ZONE USING num_replicas = 7", + expectedIDs: descpb.IDs{keys.TimeseriesRangesID}, + }, + // Test that events on types/schemas are also captured. + { + setup: "CREATE DATABASE db", + stmt: "CREATE SCHEMA db.sc", + // One ID each for the parent database and the schema. + expectedIDs: descpb.IDs{60, 61}, + }, + { + stmt: "CREATE TYPE typ AS ENUM()", + // One ID each for the enum and the array type. + expectedIDs: descpb.IDs{62, 63}, + }, + } + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + defer tc.Stopper().Stop(context.Background()) + + ts := tc.Server(0 /* idx */) + + sqlDB := tc.ServerConn(0 /* idx */) + for _, tc := range testCases { + sqlWatcher := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + nil, /* knobs */ + ).New() + + _, err := sqlDB.Exec(tc.setup) + require.NoError(t, err) + + // Save the startTS here before the test case is executed to ensure the + // watcher can pick up the change when we start it in a separate goroutine. + startTS := ts.Clock().Now() + + var mu struct { + syncutil.Mutex + receivedIDs map[descpb.ID]struct{} + } + mu.receivedIDs = make(map[descpb.ID]struct{}) + + watcherCtx, watcherCancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + prevCheckpointTS := startTS + _ = sqlWatcher.WatchForSQLUpdates(watcherCtx, + startTS, + func(ctx context.Context, updates []spanconfig.DescriptorUpdate, checkpointTS hlc.Timestamp) error { + require.True(t, prevCheckpointTS.Less(checkpointTS)) + mu.Lock() + defer mu.Unlock() + for _, update := range updates { + mu.receivedIDs[update.ID] = struct{}{} + } + prevCheckpointTS = checkpointTS + return nil + }) + }() + + _, err = sqlDB.Exec(tc.stmt) + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + mu.Lock() + defer mu.Unlock() + if len(mu.receivedIDs) == len(tc.expectedIDs) { + return nil + } + return errors.Newf("expected to receive %d IDs, but found %d", len(tc.expectedIDs), len(mu.receivedIDs)) + }) + + // Rangefeed events aren't guaranteed to be in any particular order for + // different keys. + mu.Lock() + for _, id := range tc.expectedIDs { + _, seen := mu.receivedIDs[id] + require.True(t, seen) + } + mu.Unlock() + + // Stop the watcher and wait for the goroutine to complete. + watcherCancel() + wg.Wait() + } +} + +// TestSQLWatcherFactory tests that the SQLWatcherFactory can create multiple +// SQLWatchers and that every SQLWatcher is only good for a single +// WatchForSQLUpdates. +func TestSQLWatcherFactory(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + sqlDB := tc.ServerConn(0 /* idx */) + + sqlWatcherFactory := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + nil, /* knobs */ + ) + + startTS := ts.Clock().Now() + _, err := sqlDB.Exec("CREATE TABLE t()") + require.NoError(t, err) + + sqlWatcher := sqlWatcherFactory.New() + + var wg sync.WaitGroup + + watcherCtx, watcherCancel := context.WithCancel(ctx) + wg.Add(1) + go func() { + defer wg.Done() + _ = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Error("handler should never run") + return nil + }) + }() + + watcherCancel() + wg.Wait() + + err = sqlWatcher.WatchForSQLUpdates(watcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Fatal("handler should never run") + return nil + }) + require.Error(t, err) + require.True(t, testutils.IsError(err, "watcher already started watching")) + + newSQLWatcher := sqlWatcherFactory.New() + newWatcherCtx, newWatcherCancel := context.WithCancel(ctx) + + wg.Add(1) + go func() { + defer wg.Done() + err = newSQLWatcher.WatchForSQLUpdates(newWatcherCtx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Error("handler should never run") + return nil + }) + require.True(t, testutils.IsError(err, "context canceled")) + }() + + newWatcherCancel() + wg.Wait() +} + +// TestSQLWatcherOnEventError ensures that if there is an error processing a +// rangefeed event the handler is never run and the error is bubbled back to +// the caller. +func TestSQLWatcherOnEventError(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + sqlDB := tc.ServerConn(0 /* idx */) + + sqlWatcherFactory := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + &spanconfig.TestingKnobs{ + SQLWatcherOnEventInterceptor: func() error { + return errors.New("boom") + }, + }, + ) + + startTS := ts.Clock().Now() + _, err := sqlDB.Exec("CREATE TABLE t()") + require.NoError(t, err) + + sqlWatcher := sqlWatcherFactory.New() + err = sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + t.Fatal("handler should never run") + return nil + }) + require.Error(t, err) + require.True(t, testutils.IsError(err, "boom")) +} + +// TestSQLWatcherHandlerError ensures that no further calls are made to the +// handler after it returns an error. +func TestSQLWatcherHandlerError(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, // disable the automatic job creation. + }, + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. + }, + }, + }) + ctx := context.Background() + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0 /* idx */) + sqlDB := tc.ServerConn(0 /* idx */) + + sqlWatcher := spanconfigsqlwatcher.NewFactory( + keys.SystemSQLCodec, + ts.ClusterSettings(), + ts.RangeFeedFactory().(*rangefeed.Factory), + 1<<20, /* 1 MB, bufferMemLimit */ + ts.Stopper(), + nil, /* knobs */ + ).New() + + _, err := sqlDB.Exec("CREATE TABLE t()") + require.NoError(t, err) + + stopTraffic := make(chan struct{}) + startTS := ts.Clock().Now() + + var wg sync.WaitGroup + wg.Add(1) + + // Start a background thread that modifies zone configs on a table to trigger + // events on system.zones. + go func() { + defer wg.Done() + for { + select { + case <-stopTraffic: + return + default: + } + _, err = sqlDB.Exec("ALTER TABLE t CONFIGURE ZONE USING num_replicas=5") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + } + }() + + var numCalled int32 + // Wrap the call to WatchForSQLUpdates in a SucceedsSoon to ensure it + // evaluates within 45 seconds. + testutils.SucceedsSoon(t, func() error { + err := sqlWatcher.WatchForSQLUpdates(ctx, startTS, func(context.Context, []spanconfig.DescriptorUpdate, hlc.Timestamp) error { + atomic.AddInt32(&numCalled, 1) + return errors.New("handler error") + }) + require.Error(t, err) + require.True(t, testutils.IsError(err, "handler error")) + return nil + }) + + // Shutdown the background goroutine. + close(stopTraffic) + wg.Wait() + + // Ensure that the handler was called only once. + require.Equal(t, int32(1), atomic.LoadInt32(&numCalled)) +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go new file mode 100644 index 000000000000..7d27e77b91d3 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher + +import ( + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/errors" +) + +// zonesDecoder decodes the zone ID (primary key) of rows from system.zones. +// It's not safe for concurrent use. +type zonesDecoder struct { + alloc rowenc.DatumAlloc + codec keys.SQLCodec +} + +// newZonesDecoder instantiates a zonesDecoder. +func newZonesDecoder(codec keys.SQLCodec) *zonesDecoder { + return &zonesDecoder{ + codec: codec, + } +} + +// DecodePrimaryKey decodes the primary key (zone ID) from the system.zones +// table. +func (zd *zonesDecoder) DecodePrimaryKey(key roachpb.Key) (descpb.ID, error) { + // Decode the descriptor ID from the key. + tbl := systemschema.ZonesTable + types := []*types.T{tbl.PublicColumns()[0].GetType()} + startKeyRow := make([]rowenc.EncDatum, 1) + _, matches, _, err := rowenc.DecodeIndexKey( + zd.codec, types, startKeyRow, nil /* colDirs */, key, + ) + if err != nil || !matches { + return descpb.InvalidID, errors.AssertionFailedf("failed to decode key in system.zones %v", key) + } + if err := startKeyRow[0].EnsureDecoded(types[0], &zd.alloc); err != nil { + return descpb.InvalidID, errors.AssertionFailedf("failed to decode key in system.zones %v", key) + } + descID := descpb.ID(tree.MustBeDInt(startKeyRow[0].Datum)) + return descID, nil +} + +// TestingZonesDecoderDecodePrimaryKey constructs a zonesDecoder using the given +// codec and decodes the supplied key using it. This wrapper is exported for +// testing purposes to ensure the struct remains private. +func TestingZonesDecoderDecodePrimaryKey(codec keys.SQLCodec, key roachpb.Key) (descpb.ID, error) { + return newZonesDecoder(codec).DecodePrimaryKey(key) +} diff --git a/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go new file mode 100644 index 000000000000..3a009b072a16 --- /dev/null +++ b/pkg/spanconfig/spanconfigsqlwatcher/zonesdecoder_test.go @@ -0,0 +1,108 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigsqlwatcher_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" +) + +// TestZoneDecoderDecodePrimaryKey verifies that we can decode the primary key +// stored in a system.zones like table. +func TestZonesDecoderDecodePrimaryKey(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + ManagerDisableJobCreation: true, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Create a dummy table, like system.zones, to modify in this test. This lets + // us test things without bother with the prepoulated contents for + // system.zones. + // + // Note that system.zones has two column families (for legacy) reasons, but + // the table dummy table constructed below does not. This shouldn't matter + // as the decoder is only decoding the primary key in this test, which doesn't + // change across different column families. + const dummyTableName = "dummy_zones" + sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE %s (LIKE system.zones INCLUDING ALL)", dummyTableName)) + + var dummyTableID uint32 + sqlDB.QueryRow( + t, + fmt.Sprintf("SELECT id FROM system.namespace WHERE name='%s'", dummyTableName), + ).Scan(&dummyTableID) + + k := keys.SystemSQLCodec.TablePrefix(dummyTableID) + + entries := []struct { + id descpb.ID + proto zonepb.ZoneConfig + }{ + { + id: 50, + proto: zonepb.ZoneConfig{}, + }, + { + id: 55, + proto: zonepb.DefaultZoneConfig(), + }, + { + id: 60, + proto: zonepb.ZoneConfig{ + NumReplicas: proto.Int32(5), + }, + }, + } + + for _, entry := range entries { + buf, err := protoutil.Marshal(&entry.proto) + require.NoError(t, err) + + _ = sqlDB.Exec( + t, fmt.Sprintf("UPSERT INTO %s (id, config) VALUES ($1, $2) ", dummyTableName), entry.id, buf, + ) + require.NoError(t, err) + } + + rows, err := tc.Server(0).DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + require.Equal(t, len(entries), len(rows)) + + for i, row := range rows { + got, err := spanconfigsqlwatcher.TestingZonesDecoderDecodePrimaryKey(keys.SystemSQLCodec, row.Key) + require.NoError(t, err) + require.Equal(t, entries[i].id, got) + } +} diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index acde9d9f34b6..38b9489c02e1 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -53,6 +53,10 @@ type TestingKnobs struct { // StoreKVSubscriberOverride is used to override the KVSubscriber used when // setting up a new store. StoreKVSubscriberOverride KVSubscriber + + // SQLWatcherOnEventInterceptor, if set, is invoked when the SQLWatcher + // receives an event on one of its rangefeeds. + SQLWatcherOnEventInterceptor func() error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/testutils/serverutils/test_server_shim.go b/pkg/testutils/serverutils/test_server_shim.go index 8aca2cb17a41..5bc6701c3a16 100644 --- a/pkg/testutils/serverutils/test_server_shim.go +++ b/pkg/testutils/serverutils/test_server_shim.go @@ -122,6 +122,10 @@ type TestServerInterface interface { // an interface{}. SpanConfigSQLTranslator() interface{} + // SpanConfigSQLWatcherFactory returns the underlying + // spanconfig.SQLWatcherFactory as an interface{}. + SpanConfigSQLWatcherFactory() interface{} + // SQLServer returns the *sql.Server as an interface{}. SQLServer() interface{}