diff --git a/pkg/server/settingswatcher/BUILD.bazel b/pkg/server/settingswatcher/BUILD.bazel index 87c666ff9967..a92eadf5a7e7 100644 --- a/pkg/server/settingswatcher/BUILD.bazel +++ b/pkg/server/settingswatcher/BUILD.bazel @@ -50,6 +50,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/rangefeed", "//pkg/kv/kvclient/rangefeed/rangefeedcache", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", diff --git a/pkg/server/settingswatcher/settings_watcher.go b/pkg/server/settingswatcher/settings_watcher.go index 09afd8d88079..ece382f3dac2 100644 --- a/pkg/server/settingswatcher/settings_watcher.go +++ b/pkg/server/settingswatcher/settings_watcher.go @@ -49,7 +49,7 @@ type SettingsWatcher struct { syncutil.Mutex updater settings.Updater - values map[string]settings.EncodedValue + values map[string]settingsValue overrides map[string]settings.EncodedValue } @@ -130,7 +130,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error { } } - s.mu.values = make(map[string]settings.EncodedValue) + s.mu.values = make(map[string]settingsValue) if s.overridesMonitor != nil { s.mu.overrides = make(map[string]settings.EncodedValue) @@ -250,25 +250,52 @@ func (s *SettingsWatcher) handleKV( } } + s.maybeSet(ctx, name, settingsValue{ + val: val, + ts: kv.Value.Timestamp, + tombstone: tombstone, + }) + if s.storage != nil { + return kv + } + return nil +} + +// maybeSet will update the stored value and the corresponding setting +// in response to a kv event, assuming that event is new. +func (s *SettingsWatcher) maybeSet(ctx context.Context, name string, sv settingsValue) { s.mu.Lock() defer s.mu.Unlock() + // Skip updates which have an earlier timestamp to avoid regressing on the + // value of a setting. Note that we intentionally process values at the same + // timestamp as the current value. This is important to deal with cases where + // the underlying rangefeed restarts. When that happens, we'll construct a + // new settings updater and expect to re-process every setting which is + // currently set. + if existing, ok := s.mu.values[name]; ok && sv.ts.Less(existing.ts) { + return + } _, hasOverride := s.mu.overrides[name] - if tombstone { + s.mu.values[name] = sv + if sv.tombstone { // This event corresponds to a deletion. - delete(s.mu.values, name) if !hasOverride { s.setDefaultLocked(ctx, name) } } else { - s.mu.values[name] = val if !hasOverride { - s.setLocked(ctx, name, val) + s.setLocked(ctx, name, sv.val) } } - if s.storage != nil { - return kv - } - return nil +} + +// settingValue tracks an observed value from the rangefeed. By tracking the +// timestamp, we can avoid regressing the settings values in the face of +// rangefeed restarts. +type settingsValue struct { + val settings.EncodedValue + ts hlc.Timestamp + tombstone bool } const versionSettingKey = "version" @@ -343,8 +370,8 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) { // Reset the setting to the value in the settings table (or the default // value). - if val, ok := s.mu.values[key]; ok { - s.setLocked(ctx, key, val) + if sv, ok := s.mu.values[key]; ok && !sv.tombstone { + s.setLocked(ctx, key, sv.val) } else { s.setDefaultLocked(ctx, key) } diff --git a/pkg/server/settingswatcher/settings_watcher_external_test.go b/pkg/server/settingswatcher/settings_watcher_external_test.go index 9a29e18ea611..531b142f5ab0 100644 --- a/pkg/server/settingswatcher/settings_watcher_external_test.go +++ b/pkg/server/settingswatcher/settings_watcher_external_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/settingswatcher" "github.com/cockroachdb/cockroach/pkg/settings" @@ -431,3 +432,142 @@ func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error { } return nil } + +// Test that when the rangefeed sends a prefix of events (as it is allowed to +// do), that the setting value that clients read does not regress. +func TestStaleRowsDoNotCauseSettingsToRegress(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + + // Make a bogus tenant ID and codec we'll use to prefix the events + // we want to observe. Below, inject a testing knob to plumb the stream + // into the test logic to make injecting rangefeed events straightforward. + bogusTenantID := roachpb.MakeTenantID(42) + bogusCodec := keys.MakeSQLCodec(bogusTenantID) + settingsStart := bogusCodec.TablePrefix(keys.SettingsTableID) + interceptedStreamCh := make(chan roachpb.RangeFeedEventSink) + cancelCtx, cancel := context.WithCancel(ctx) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingRangefeedFilter: func(args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink) *roachpb.Error { + if !args.Span.ContainsKey(settingsStart) { + return nil + } + select { + case interceptedStreamCh <- stream: + case <-cancelCtx.Done(): + } + <-cancelCtx.Done() + return nil + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + defer cancel() + tdb := sqlutils.MakeSQLRunner(sqlDB) + + const ( + defaultFakeSettingValue = "foo" + fakeSettingName = "test_setting" + ) + + fakeSetting := settings.RegisterStringSetting( + settings.TenantWritable, fakeSettingName, "for testing", defaultFakeSettingValue, + ) + + // Set a cluster setting in the real cluster and read its raw KV. + // This will form the basis for events we inject into the fake watcher. + // The tenant prefix, if one exists, will have been stripped from the + // key. + getSettingKVForFakeSetting := func(t *testing.T) roachpb.KeyValue { + codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec + k := codec.TablePrefix(keys.SettingsTableID) + rows, err := s.DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */) + require.NoError(t, err) + dec := settingswatcher.MakeRowDecoder(codec) + for _, r := range rows { + rkv := roachpb.KeyValue{Key: r.Key} + if r.Value != nil { + rkv.Value = *r.Value + } + name, _, _, err := dec.DecodeRow(rkv) + require.NoError(t, err) + if name == fakeSettingName { + rkv.Key, err = codec.StripTenantPrefix(rkv.Key) + require.NoError(t, err) + rkv.Value.ClearChecksum() + rkv.Value.InitChecksum(rkv.Key) + return rkv + } + } + t.Fatalf("failed to find setting %v", fakeSettingName) + return roachpb.KeyValue{} // unreachable + } + + // newRangeFeedEvent creates a RangeFeedEvent for the bogus tenant using a KV + // which has a stripped prefix. It also sets the timestamp. + newRangeFeedEvent := func(kv roachpb.KeyValue, ts hlc.Timestamp) *roachpb.RangeFeedEvent { + kv.Key = append(bogusCodec.TenantPrefix(), kv.Key...) + kv.Value.Timestamp = ts + kv.Value.ClearChecksum() + kv.Value.InitChecksum(kv.Key) + return &roachpb.RangeFeedEvent{ + Val: &roachpb.RangeFeedValue{Key: kv.Key, Value: kv.Value}, + } + } + sideSettings := cluster.MakeTestingClusterSettings() + settingIsSoon := func(t *testing.T, exp string) { + testutils.SucceedsSoon(t, func() error { + if got := fakeSetting.Get(&sideSettings.SV); got != exp { + return errors.Errorf("expected %v, got %v", exp, got) + } + return nil + }) + } + settingStillHasValueAfterAShortWhile := func(t *testing.T, exp string) { + const aShortWhile = 10 * time.Millisecond + require.Equal(t, exp, fakeSetting.Get(&sideSettings.SV)) + time.Sleep(aShortWhile) + require.Equal(t, exp, fakeSetting.Get(&sideSettings.SV)) + } + w := settingswatcher.New( + s.Clock(), + bogusCodec, + sideSettings, + s.RangeFeedFactory().(*rangefeed.Factory), + s.Stopper(), + nil, + ) + // Start the watcher, make sure the value is the default, and intercept + // the rangefeed. + require.NoError(t, w.Start(ctx)) + require.Equal(t, defaultFakeSettingValue, fakeSetting.Get(&sideSettings.SV)) + stream := <-interceptedStreamCh + require.Equal(t, defaultFakeSettingValue, fakeSetting.Get(&sideSettings.SV)) + + // Synthesize a proper KV value by writing the setting into the real settings + // table and then use that to inject a properly prefixed value into the stream. + const newSettingValue = "bar" + tdb.Exec(t, "SET CLUSTER SETTING "+fakeSettingName+" = $1", newSettingValue) + setting1KV := getSettingKVForFakeSetting(t) + + ts0 := s.Clock().Now() + ts1 := ts0.Next() + ts2 := ts1.Next() + tombstone := setting1KV + tombstone.Value.RawBytes = nil + + require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1))) + settingIsSoon(t, newSettingValue) + + require.NoError(t, stream.Send(newRangeFeedEvent(tombstone, ts0))) + settingStillHasValueAfterAShortWhile(t, newSettingValue) + + require.NoError(t, stream.Send(newRangeFeedEvent(tombstone, ts2))) + settingIsSoon(t, defaultFakeSettingValue) + require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1))) + settingStillHasValueAfterAShortWhile(t, defaultFakeSettingValue) +}