Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/settingswatcher: track timestamps so values do not regress #87564

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/server/settingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
51 changes: 39 additions & 12 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SettingsWatcher struct {
syncutil.Mutex

updater settings.Updater
values map[string]settings.EncodedValue
values map[string]settingsValue
overrides map[string]settings.EncodedValue
}

Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
}
}

s.mu.values = make(map[string]settings.EncodedValue)
s.mu.values = make(map[string]settingsValue)

if s.overridesMonitor != nil {
s.mu.overrides = make(map[string]settings.EncodedValue)
Expand Down Expand Up @@ -250,25 +250,52 @@ func (s *SettingsWatcher) handleKV(
}
}

s.maybeSet(ctx, name, settingsValue{
val: val,
ts: kv.Value.Timestamp,
tombstone: tombstone,
})
if s.storage != nil {
return kv
}
return nil
}

// maybeSet will update the stored value and the corresponding setting
// in response to a kv event, assuming that event is new.
func (s *SettingsWatcher) maybeSet(ctx context.Context, name string, sv settingsValue) {
s.mu.Lock()
defer s.mu.Unlock()
// Skip updates which have an earlier timestamp to avoid regressing on the
// value of a setting. Note that we intentionally process values at the same
// timestamp as the current value. This is important to deal with cases where
// the underlying rangefeed restarts. When that happens, we'll construct a
// new settings updater and expect to re-process every setting which is
// currently set.
if existing, ok := s.mu.values[name]; ok && sv.ts.Less(existing.ts) {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a comment here (or on the func) as to why it'd be out of order might be nice; I'd ague I'd have expected it here more than on the struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
_, hasOverride := s.mu.overrides[name]
if tombstone {
s.mu.values[name] = sv
if sv.tombstone {
// This event corresponds to a deletion.
delete(s.mu.values, name)
if !hasOverride {
s.setDefaultLocked(ctx, name)
}
} else {
s.mu.values[name] = val
if !hasOverride {
s.setLocked(ctx, name, val)
s.setLocked(ctx, name, sv.val)
}
}
if s.storage != nil {
return kv
}
return nil
}

// settingValue tracks an observed value from the rangefeed. By tracking the
// timestamp, we can avoid regressing the settings values in the face of
// rangefeed restarts.
type settingsValue struct {
val settings.EncodedValue
ts hlc.Timestamp
tombstone bool
}

const versionSettingKey = "version"
Expand Down Expand Up @@ -343,8 +370,8 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) {

// Reset the setting to the value in the settings table (or the default
// value).
if val, ok := s.mu.values[key]; ok {
s.setLocked(ctx, key, val)
if sv, ok := s.mu.values[key]; ok && !sv.tombstone {
s.setLocked(ctx, key, sv.val)
} else {
s.setDefaultLocked(ctx, key)
}
Expand Down
140 changes: 140 additions & 0 deletions pkg/server/settingswatcher/settings_watcher_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -431,3 +432,142 @@ func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error {
}
return nil
}

// Test that when the rangefeed sends a prefix of events (as it is allowed to
// do), that the setting value that clients read does not regress.
func TestStaleRowsDoNotCauseSettingsToRegress(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

// Make a bogus tenant ID and codec we'll use to prefix the events
// we want to observe. Below, inject a testing knob to plumb the stream
// into the test logic to make injecting rangefeed events straightforward.
bogusTenantID := roachpb.MakeTenantID(42)
bogusCodec := keys.MakeSQLCodec(bogusTenantID)
settingsStart := bogusCodec.TablePrefix(keys.SettingsTableID)
interceptedStreamCh := make(chan roachpb.RangeFeedEventSink)
cancelCtx, cancel := context.WithCancel(ctx)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRangefeedFilter: func(args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink) *roachpb.Error {
if !args.Span.ContainsKey(settingsStart) {
return nil
}
select {
case interceptedStreamCh <- stream:
case <-cancelCtx.Done():
}
<-cancelCtx.Done()
return nil
},
},
},
})
defer s.Stopper().Stop(ctx)
defer cancel()
tdb := sqlutils.MakeSQLRunner(sqlDB)

const (
defaultFakeSettingValue = "foo"
fakeSettingName = "test_setting"
)

fakeSetting := settings.RegisterStringSetting(
settings.TenantWritable, fakeSettingName, "for testing", defaultFakeSettingValue,
)

// Set a cluster setting in the real cluster and read its raw KV.
// This will form the basis for events we inject into the fake watcher.
// The tenant prefix, if one exists, will have been stripped from the
// key.
getSettingKVForFakeSetting := func(t *testing.T) roachpb.KeyValue {
codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
k := codec.TablePrefix(keys.SettingsTableID)
rows, err := s.DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
dec := settingswatcher.MakeRowDecoder(codec)
for _, r := range rows {
rkv := roachpb.KeyValue{Key: r.Key}
if r.Value != nil {
rkv.Value = *r.Value
}
name, _, _, err := dec.DecodeRow(rkv)
require.NoError(t, err)
if name == fakeSettingName {
rkv.Key, err = codec.StripTenantPrefix(rkv.Key)
require.NoError(t, err)
rkv.Value.ClearChecksum()
rkv.Value.InitChecksum(rkv.Key)
return rkv
}
}
t.Fatalf("failed to find setting %v", fakeSettingName)
return roachpb.KeyValue{} // unreachable
}

// newRangeFeedEvent creates a RangeFeedEvent for the bogus tenant using a KV
// which has a stripped prefix. It also sets the timestamp.
newRangeFeedEvent := func(kv roachpb.KeyValue, ts hlc.Timestamp) *roachpb.RangeFeedEvent {
kv.Key = append(bogusCodec.TenantPrefix(), kv.Key...)
kv.Value.Timestamp = ts
kv.Value.ClearChecksum()
kv.Value.InitChecksum(kv.Key)
return &roachpb.RangeFeedEvent{
Val: &roachpb.RangeFeedValue{Key: kv.Key, Value: kv.Value},
}
}
sideSettings := cluster.MakeTestingClusterSettings()
settingIsSoon := func(t *testing.T, exp string) {
testutils.SucceedsSoon(t, func() error {
if got := fakeSetting.Get(&sideSettings.SV); got != exp {
return errors.Errorf("expected %v, got %v", exp, got)
}
return nil
})
}
settingStillHasValueAfterAShortWhile := func(t *testing.T, exp string) {
const aShortWhile = 10 * time.Millisecond
require.Equal(t, exp, fakeSetting.Get(&sideSettings.SV))
time.Sleep(aShortWhile)
require.Equal(t, exp, fakeSetting.Get(&sideSettings.SV))
}
w := settingswatcher.New(
s.Clock(),
bogusCodec,
sideSettings,
s.RangeFeedFactory().(*rangefeed.Factory),
s.Stopper(),
nil,
)
// Start the watcher, make sure the value is the default, and intercept
// the rangefeed.
require.NoError(t, w.Start(ctx))
require.Equal(t, defaultFakeSettingValue, fakeSetting.Get(&sideSettings.SV))
stream := <-interceptedStreamCh
require.Equal(t, defaultFakeSettingValue, fakeSetting.Get(&sideSettings.SV))

// Synthesize a proper KV value by writing the setting into the real settings
// table and then use that to inject a properly prefixed value into the stream.
const newSettingValue = "bar"
tdb.Exec(t, "SET CLUSTER SETTING "+fakeSettingName+" = $1", newSettingValue)
setting1KV := getSettingKVForFakeSetting(t)

ts0 := s.Clock().Now()
ts1 := ts0.Next()
ts2 := ts1.Next()
tombstone := setting1KV
tombstone.Value.RawBytes = nil

require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1)))
settingIsSoon(t, newSettingValue)

require.NoError(t, stream.Send(newRangeFeedEvent(tombstone, ts0)))
settingStillHasValueAfterAShortWhile(t, newSettingValue)

require.NoError(t, stream.Send(newRangeFeedEvent(tombstone, ts2)))
settingIsSoon(t, defaultFakeSettingValue)
require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1)))
settingStillHasValueAfterAShortWhile(t, defaultFakeSettingValue)
}