Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: adopt for settings rangefeed-backed settingswatcher, remove g… #69269

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ go_library(
"server_sql.go",
"server_systemlog_gc.go",
"settings_cache.go",
"settingsworker.go",
"sql_stats.go",
"statement_diagnostics_requests.go",
"statements.go",
Expand Down Expand Up @@ -297,7 +296,7 @@ go_test(
"server_systemlog_gc_test.go",
"server_test.go",
"settings_cache_test.go",
"settingsworker_test.go",
"settings_test.go",
"statements_test.go",
"stats_test.go",
"status_test.go",
Expand All @@ -321,6 +320,7 @@ go_test(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
})
registry.AddMetricStruct(kvProber.Metrics())

settingsWriter := &settingsCacheWriter{eng: engines[0]}
sqlServer, err := newSQLServer(ctx, sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(sStatus),
Expand Down Expand Up @@ -824,6 +825,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
regionsServer: sStatus,
tenantUsageServer: tenantUsage,
monitorAndMetrics: sqlMonitorAndMetrics,
settingsStorage: settingsWriter,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1545,7 +1547,9 @@ func (s *Server) PreStart(ctx context.Context) error {

// Apply any cached initial settings (and start the gossip listener) as early
// as possible, to avoid spending time with stale settings.
if err := s.refreshSettings(state.initialSettingsKVs); err != nil {
if err := initializeCachedSettings(
ctx, keys.SystemSQLCodec, s.st.MakeUpdater(), state.initialSettingsKVs,
); err != nil {
return errors.Wrap(err, "during initializing settings updater")
}

Expand Down
19 changes: 9 additions & 10 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ type sqlServerArgs struct {

// monitorAndMetrics contains the return value of newRootSQLMemoryMonitor.
monitorAndMetrics monitorAndMetrics

// settingsStorage is an optional interface to drive storing of settings
// data on disk to provide a fresh source of settings upon next startup.
settingsStorage settingswatcher.Storage
}

type monitorAndMetrics struct {
Expand Down Expand Up @@ -936,12 +940,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
reporter.TestingKnobs = &cfg.TestingKnobs.Server.(*TestingKnobs).DiagnosticsTestingKnobs
}

var settingsWatcher *settingswatcher.SettingsWatcher
if !codec.ForSystemTenant() {
settingsWatcher = settingswatcher.New(
cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper,
)
}
settingsWatcher := settingswatcher.New(
cfg.clock, codec, cfg.Settings, cfg.rangeFeedFactory, cfg.stopper, cfg.settingsStorage,
)

return &SQLServer{
ambientCtx: cfg.BaseConfig.AmbientCtx,
Expand Down Expand Up @@ -1146,10 +1147,8 @@ func (s *SQLServer) preStart(
bootstrapVersion = roachpb.Version{Major: 20, Minor: 1, Internal: 1}
}

if s.settingsWatcher != nil {
if err := s.settingsWatcher.Start(ctx); err != nil {
return errors.Wrap(err, "initializing settings")
}
if err := s.settingsWatcher.Start(ctx); err != nil {
return errors.Wrap(err, "initializing settings")
}

// Run startup migrations (note: these depend on jobs subsystem running).
Expand Down
33 changes: 33 additions & 0 deletions pkg/server/settings_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

type settingsCacheWriter struct {
eng storage.Engine
}

func (s settingsCacheWriter) WriteKVs(ctx context.Context, kvs []roachpb.KeyValue) error {
return storeCachedSettingsKVs(ctx, s.eng, kvs)
}

var _ settingswatcher.Storage = (*settingsCacheWriter)(nil)

// storeCachedSettingsKVs stores or caches node's settings locally.
// This helps in restoring the node restart with the at least the same settings with which it died.
func storeCachedSettingsKVs(ctx context.Context, eng storage.Engine, kvs []roachpb.KeyValue) error {
Expand Down Expand Up @@ -64,3 +78,22 @@ func loadCachedSettingsKVs(_ context.Context, eng storage.Engine) ([]roachpb.Key
}
return settingsKVs, nil
}

func initializeCachedSettings(
ctx context.Context, codec keys.SQLCodec, updater settings.Updater, kvs []roachpb.KeyValue,
) error {
dec := settingswatcher.MakeRowDecoder(codec)
for _, kv := range kvs {
settings, val, valType, _, err := dec.DecodeRow(kv)
if err != nil {
return errors.Wrap(err, `while decoding settings data
-this likely indicates the settings table structure or encoding has been altered;
-skipping settings updates`)
}
if err := updater.Set(ctx, settings, val, valType); err != nil {
log.Warningf(ctx, "setting %q to %q failed: %+v", settings, val, err)
}
}
updater.ResetRemaining(ctx)
return nil
}
5 changes: 4 additions & 1 deletion pkg/server/settings_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -73,9 +75,10 @@ func TestCachedSettingsServerRestart(t *testing.T) {
},
},
}

var settingsCache []roachpb.KeyValue
testServer, _, _ := serverutils.StartServer(t, serverArgs)
closedts.TargetDuration.Override(ctx, &testServer.ClusterSettings().SV, 10*time.Millisecond)
closedts.SideTransportCloseInterval.Override(ctx, &testServer.ClusterSettings().SV, 10*time.Millisecond)
testutils.SucceedsSoon(t, func() error {
store, err := testServer.GetStores().(*kvserver.Stores).GetStore(1)
if err != nil {
Expand Down
File renamed without changes.
5 changes: 5 additions & 0 deletions pkg/server/settingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/settings/cluster",
Expand All @@ -28,6 +29,8 @@ go_library(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -43,6 +46,7 @@ go_test(
":settingswatcher",
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -56,6 +60,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
Loading