Skip to content

Commit

Permalink
Merge #87564
Browse files Browse the repository at this point in the history
87564: server/settingswatcher: track timestamps so values do not regress r=ajwerner a=ajwerner

A rangefeed is allowed to send previously seen values. When it did, it would result in the observed value of a setting regressing. There's no need for this: we can track some timestamps and ensure we do not regress.

Fixes #87502

Relates to #87201

Release note (bug fix): In rare cases, the value of a cluster setting could regress soon after it was set. This no longer happens for a given gateway node.

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Sep 9, 2022
2 parents c20e8a1 + ffea578 commit a79439e
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 12 deletions.
1 change: 1 addition & 0 deletions pkg/server/settingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedcache",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
51 changes: 39 additions & 12 deletions pkg/server/settingswatcher/settings_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SettingsWatcher struct {
syncutil.Mutex

updater settings.Updater
values map[string]settings.EncodedValue
values map[string]settingsValue
overrides map[string]settings.EncodedValue
}

Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *SettingsWatcher) Start(ctx context.Context) error {
}
}

s.mu.values = make(map[string]settings.EncodedValue)
s.mu.values = make(map[string]settingsValue)

if s.overridesMonitor != nil {
s.mu.overrides = make(map[string]settings.EncodedValue)
Expand Down Expand Up @@ -250,25 +250,52 @@ func (s *SettingsWatcher) handleKV(
}
}

s.maybeSet(ctx, name, settingsValue{
val: val,
ts: kv.Value.Timestamp,
tombstone: tombstone,
})
if s.storage != nil {
return kv
}
return nil
}

// maybeSet will update the stored value and the corresponding setting
// in response to a kv event, assuming that event is new.
func (s *SettingsWatcher) maybeSet(ctx context.Context, name string, sv settingsValue) {
s.mu.Lock()
defer s.mu.Unlock()
// Skip updates which have an earlier timestamp to avoid regressing on the
// value of a setting. Note that we intentionally process values at the same
// timestamp as the current value. This is important to deal with cases where
// the underlying rangefeed restarts. When that happens, we'll construct a
// new settings updater and expect to re-process every setting which is
// currently set.
if existing, ok := s.mu.values[name]; ok && sv.ts.Less(existing.ts) {
return
}
_, hasOverride := s.mu.overrides[name]
if tombstone {
s.mu.values[name] = sv
if sv.tombstone {
// This event corresponds to a deletion.
delete(s.mu.values, name)
if !hasOverride {
s.setDefaultLocked(ctx, name)
}
} else {
s.mu.values[name] = val
if !hasOverride {
s.setLocked(ctx, name, val)
s.setLocked(ctx, name, sv.val)
}
}
if s.storage != nil {
return kv
}
return nil
}

// settingValue tracks an observed value from the rangefeed. By tracking the
// timestamp, we can avoid regressing the settings values in the face of
// rangefeed restarts.
type settingsValue struct {
val settings.EncodedValue
ts hlc.Timestamp
tombstone bool
}

const versionSettingKey = "version"
Expand Down Expand Up @@ -343,8 +370,8 @@ func (s *SettingsWatcher) updateOverrides(ctx context.Context) {

// Reset the setting to the value in the settings table (or the default
// value).
if val, ok := s.mu.values[key]; ok {
s.setLocked(ctx, key, val)
if sv, ok := s.mu.values[key]; ok && !sv.tombstone {
s.setLocked(ctx, key, sv.val)
} else {
s.setDefaultLocked(ctx, key)
}
Expand Down
140 changes: 140 additions & 0 deletions pkg/server/settingswatcher/settings_watcher_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -431,3 +432,142 @@ func CheckSettingsValuesMatch(t *testing.T, a, b *cluster.Settings) error {
}
return nil
}

// Test that when the rangefeed sends a prefix of events (as it is allowed to
// do), that the setting value that clients read does not regress.
func TestStaleRowsDoNotCauseSettingsToRegress(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

// Make a bogus tenant ID and codec we'll use to prefix the events
// we want to observe. Below, inject a testing knob to plumb the stream
// into the test logic to make injecting rangefeed events straightforward.
bogusTenantID := roachpb.MakeTenantID(42)
bogusCodec := keys.MakeSQLCodec(bogusTenantID)
settingsStart := bogusCodec.TablePrefix(keys.SettingsTableID)
interceptedStreamCh := make(chan roachpb.RangeFeedEventSink)
cancelCtx, cancel := context.WithCancel(ctx)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRangefeedFilter: func(args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink) *roachpb.Error {
if !args.Span.ContainsKey(settingsStart) {
return nil
}
select {
case interceptedStreamCh <- stream:
case <-cancelCtx.Done():
}
<-cancelCtx.Done()
return nil
},
},
},
})
defer s.Stopper().Stop(ctx)
defer cancel()
tdb := sqlutils.MakeSQLRunner(sqlDB)

const (
defaultFakeSettingValue = "foo"
fakeSettingName = "test_setting"
)

fakeSetting := settings.RegisterStringSetting(
settings.TenantWritable, fakeSettingName, "for testing", defaultFakeSettingValue,
)

// Set a cluster setting in the real cluster and read its raw KV.
// This will form the basis for events we inject into the fake watcher.
// The tenant prefix, if one exists, will have been stripped from the
// key.
getSettingKVForFakeSetting := func(t *testing.T) roachpb.KeyValue {
codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
k := codec.TablePrefix(keys.SettingsTableID)
rows, err := s.DB().Scan(ctx, k, k.PrefixEnd(), 0 /* maxRows */)
require.NoError(t, err)
dec := settingswatcher.MakeRowDecoder(codec)
for _, r := range rows {
rkv := roachpb.KeyValue{Key: r.Key}
if r.Value != nil {
rkv.Value = *r.Value
}
name, _, _, err := dec.DecodeRow(rkv)
require.NoError(t, err)
if name == fakeSettingName {
rkv.Key, err = codec.StripTenantPrefix(rkv.Key)
require.NoError(t, err)
rkv.Value.ClearChecksum()
rkv.Value.InitChecksum(rkv.Key)
return rkv
}
}
t.Fatalf("failed to find setting %v", fakeSettingName)
return roachpb.KeyValue{} // unreachable
}

// newRangeFeedEvent creates a RangeFeedEvent for the bogus tenant using a KV
// which has a stripped prefix. It also sets the timestamp.
newRangeFeedEvent := func(kv roachpb.KeyValue, ts hlc.Timestamp) *roachpb.RangeFeedEvent {
kv.Key = append(bogusCodec.TenantPrefix(), kv.Key...)
kv.Value.Timestamp = ts
kv.Value.ClearChecksum()
kv.Value.InitChecksum(kv.Key)
return &roachpb.RangeFeedEvent{
Val: &roachpb.RangeFeedValue{Key: kv.Key, Value: kv.Value},
}
}
sideSettings := cluster.MakeTestingClusterSettings()
settingIsSoon := func(t *testing.T, exp string) {
testutils.SucceedsSoon(t, func() error {
if got := fakeSetting.Get(&sideSettings.SV); got != exp {
return errors.Errorf("expected %v, got %v", exp, got)
}
return nil
})
}
settingStillHasValueAfterAShortWhile := func(t *testing.T, exp string) {
const aShortWhile = 10 * time.Millisecond
require.Equal(t, exp, fakeSetting.Get(&sideSettings.SV))
time.Sleep(aShortWhile)
require.Equal(t, exp, fakeSetting.Get(&sideSettings.SV))
}
w := settingswatcher.New(
s.Clock(),
bogusCodec,
sideSettings,
s.RangeFeedFactory().(*rangefeed.Factory),
s.Stopper(),
nil,
)
// Start the watcher, make sure the value is the default, and intercept
// the rangefeed.
require.NoError(t, w.Start(ctx))
require.Equal(t, defaultFakeSettingValue, fakeSetting.Get(&sideSettings.SV))
stream := <-interceptedStreamCh
require.Equal(t, defaultFakeSettingValue, fakeSetting.Get(&sideSettings.SV))

// Synthesize a proper KV value by writing the setting into the real settings
// table and then use that to inject a properly prefixed value into the stream.
const newSettingValue = "bar"
tdb.Exec(t, "SET CLUSTER SETTING "+fakeSettingName+" = $1", newSettingValue)
setting1KV := getSettingKVForFakeSetting(t)

ts0 := s.Clock().Now()
ts1 := ts0.Next()
ts2 := ts1.Next()
tombstone := setting1KV
tombstone.Value.RawBytes = nil

require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1)))
settingIsSoon(t, newSettingValue)

require.NoError(t, stream.Send(newRangeFeedEvent(tombstone, ts0)))
settingStillHasValueAfterAShortWhile(t, newSettingValue)

require.NoError(t, stream.Send(newRangeFeedEvent(tombstone, ts2)))
settingIsSoon(t, defaultFakeSettingValue)
require.NoError(t, stream.Send(newRangeFeedEvent(setting1KV, ts1)))
settingStillHasValueAfterAShortWhile(t, defaultFakeSettingValue)
}

0 comments on commit a79439e

Please sign in to comment.