Skip to content

Commit

Permalink
sql,kvserver: stop gossiping the system config
Browse files Browse the repository at this point in the history
There were a pair of tests that didn't feel worth carrying along.
Follow up commit to remove the cluster setting.

Release note: None
  • Loading branch information
ajwerner committed Feb 16, 2022
1 parent 9f98cde commit 8878715
Show file tree
Hide file tree
Showing 71 changed files with 592 additions and 530 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-64 set the active cluster version in the format '<major>.<minor>'
version version 21.2-66 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-64</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-66</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_test(
"//pkg/cloud",
"//pkg/cloud/impl:cloudimpl",
"//pkg/clusterversion",
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/helpers_tenant_shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -115,3 +116,6 @@ func (t *testServerShim) Engines() []storage.Engine { panic(unsup
func (t *testServerShim) MetricsRecorder() *status.MetricsRecorder { panic(unsupportedShimMethod) }
func (t *testServerShim) CollectionFactory() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SpanConfigKVSubscriber() interface{} { panic(unsupportedShimMethod) }
func (t *testServerShim) SystemConfigProvider() config.SystemConfigProvider {
panic(unsupportedShimMethod)
}
17 changes: 11 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Connector struct {
client *client
nodeDescs map[roachpb.NodeID]*roachpb.NodeDescriptor
systemConfig *config.SystemConfig
systemConfigChannels []chan<- struct{}
systemConfigChannels map[chan<- struct{}]struct{}
}

settingsMu struct {
Expand Down Expand Up @@ -140,6 +140,7 @@ func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
}

c.mu.nodeDescs = make(map[roachpb.NodeID]*roachpb.NodeDescriptor)
c.mu.systemConfigChannels = make(map[chan<- struct{}]struct{})
c.settingsMu.allTenantOverrides = make(map[string]settings.EncodedValue)
c.settingsMu.specificOverrides = make(map[string]settings.EncodedValue)
return c
Expand Down Expand Up @@ -250,7 +251,7 @@ var gossipSubsHandlers = map[string]func(*Connector, context.Context, string, ro
// Subscribe to all *NodeDescriptor updates.
gossip.MakePrefixPattern(gossip.KeyNodeIDPrefix): (*Connector).updateNodeAddress,
// Subscribe to a filtered view of *SystemConfig updates.
gossip.KeySystemConfig: (*Connector).updateSystemConfig,
gossip.KeyDeprecatedSystemConfig: (*Connector).updateSystemConfig,
}

var gossipSubsPatterns = func() []string {
Expand Down Expand Up @@ -322,7 +323,7 @@ func (c *Connector) updateSystemConfig(ctx context.Context, key string, content
c.mu.Lock()
defer c.mu.Unlock()
c.mu.systemConfig = cfg
for _, c := range c.mu.systemConfigChannels {
for c := range c.mu.systemConfigChannels {
select {
case c <- struct{}{}:
default:
Expand All @@ -342,20 +343,24 @@ func (c *Connector) GetSystemConfig() *config.SystemConfig {

// RegisterSystemConfigChannel implements the config.SystemConfigProvider
// interface.
func (c *Connector) RegisterSystemConfigChannel() <-chan struct{} {
func (c *Connector) RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func()) {
// Create channel that receives new system config notifications. The channel
// has a size of 1 to prevent connector from having to block on it.
ch := make(chan struct{}, 1)

c.mu.Lock()
defer c.mu.Unlock()
c.mu.systemConfigChannels = append(c.mu.systemConfigChannels, ch)
c.mu.systemConfigChannels[ch] = struct{}{}

// Notify the channel right away if we have a config.
if c.mu.systemConfig != nil {
ch <- struct{}{}
}
return ch
return ch, func() {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.mu.systemConfigChannels, ch)
}
}

// RangeLookup implements the kvcoord.RangeDescriptorDB interface.
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,9 @@ func gossipEventForSystemConfig(cfg *config.SystemConfigEntries) *roachpb.Gossip
panic(err)
}
return &roachpb.GossipSubscriptionEvent{
Key: gossip.KeySystemConfig,
Key: gossip.KeyDeprecatedSystemConfig,
Content: roachpb.MakeValueFromBytesAndTimestamp(val, hlc.Timestamp{}),
PatternMatched: gossip.KeySystemConfig,
PatternMatched: gossip.KeyDeprecatedSystemConfig,
}
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
// Test config.SystemConfigProvider impl. Should not have a SystemConfig yet.
sysCfg := c.GetSystemConfig()
require.Nil(t, sysCfg)
sysCfgC := c.RegisterSystemConfigChannel()
sysCfgC, _ := c.RegisterSystemConfigChannel()
require.Len(t, sysCfgC, 0)

// Return first SystemConfig response.
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestConnectorGossipSubscription(t *testing.T) {
require.Equal(t, sysCfgEntriesUp.Values, sysCfg.Values)

// A newly registered SystemConfig channel will be immediately notified.
sysCfgC2 := c.RegisterSystemConfigChannel()
sysCfgC2, _ := c.RegisterSystemConfigChannel()
require.Len(t, sysCfgC2, 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2234,7 +2234,7 @@ ALTER TABLE t65064 INJECT STATISTICS '[
}
]';

query T
query T retry
SELECT * FROM [EXPLAIN SELECT * FROM t65064 WHERE username = 'kharris'] OFFSET 2
----
·
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/spanconfigccl/spanconfigcomparedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ go_test(
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/utilccl",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/systemconfigwatcher",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigtestutils",
"//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster",
Expand Down
33 changes: 19 additions & 14 deletions pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster"
Expand Down Expand Up @@ -106,14 +106,15 @@ func TestDataDriven(t *testing.T) {
{
tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */)
defer spanConfigTestCluster.Cleanup()

kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
underlyingGossip := tc.Server(0).GossipI().(*gossip.Gossip)
systemConfig := tc.Server(0).SystemConfigProvider().(*systemconfigwatcher.Cache)

systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID)
datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
Expand Down Expand Up @@ -161,15 +162,19 @@ func TestDataDriven(t *testing.T) {
// (i) reconciliation processes;
// (ii) tenant initializations (where seed span configs are
// installed).
now := systemTenant.Clock().Now()
testutils.SucceedsSoon(t, func() error {
lastUpdated := kvSubscriber.LastUpdated()
if lastUpdated.Less(now) {
return errors.Newf("kvsubscriber last updated timestamp (%s) lagging barrier timestamp (%s)",
lastUpdated.GoTime(), now.GoTime())
}
return nil
})
checkLastUpdated := func(t *testing.T, n string, c interface{ LastUpdated() hlc.Timestamp }) {
now := systemTenant.Clock().Now()
testutils.SucceedsSoon(t, func() error {
lastUpdated := c.LastUpdated()
if lastUpdated.Less(now) {
return errors.Newf("%s last updated timestamp (%s) lagging barrier timestamp (%s)",
n, lastUpdated.GoTime(), now.GoTime())
}
return nil
})
}
checkLastUpdated(t, "kvsubscriber", kvSubscriber)
checkLastUpdated(t, "systemconfigwatcher", systemConfig)

// As for the gossiped system config span, because we're using a
// single node cluster there's no additional timestamp
Expand Down Expand Up @@ -221,7 +226,7 @@ func TestDataDriven(t *testing.T) {

var reader spanconfig.StoreReader
if version == "legacy" {
reader = underlyingGossip.GetSystemConfig()
reader = systemConfig.GetSystemConfig()
} else {
reader = kvSubscriber
}
Expand All @@ -230,7 +235,7 @@ func TestDataDriven(t *testing.T) {
return spanconfigtestutils.MaybeLimitAndOffset(t, d, "...", data)

case "diff":
var before, after spanconfig.StoreReader = underlyingGossip.GetSystemConfig(), kvSubscriber
var before, after spanconfig.StoreReader = systemConfig.GetSystemConfig(), kvSubscriber
diff, err := difflib.GetUnifiedDiffString(difflib.UnifiedDiff{
A: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, before).String()),
B: difflib.SplitLines(spanconfigtestutils.GetSplitPoints(ctx, t, after).String()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ func parseGossipValues(gossipInfo *gossip.InfoStatus) (string, error) {
return "", errors.Wrapf(err, "failed to parse value for key %q", key)
}
output = append(output, fmt.Sprintf("%q: %v", key, clusterID))
} else if key == gossip.KeySystemConfig {
} else if key == gossip.KeyDeprecatedSystemConfig {
if debugCtx.printSystemConfig {
var config config.SystemConfigEntries
if err := protoutil.Unmarshal(bytes, &config); err != nil {
Expand Down
9 changes: 8 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ const (
// engine running at the required format major version, as do all other nodes
// in the cluster.
EnablePebbleFormatVersionBlockProperties

// DisableSystemConfigGossipTrigger is a follow-up to EnableSpanConfigStore
// to disable the data propagation mechanism it and the entire spanconfig
// infrastructure obviates.
DisableSystemConfigGossipTrigger
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -443,6 +446,10 @@ var versionsSingleton = keyedVersions{
Key: EnablePebbleFormatVersionBlockProperties,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 64},
},
{
Key: DisableSystemConfigGossipTrigger,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 66},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions pkg/config/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,32 @@ type SystemConfigProvider interface {
// the system config. It is notified after registration (if a system config
// is already set), and whenever a new system config is successfully
// unmarshaled.
RegisterSystemConfigChannel() <-chan struct{}
RegisterSystemConfigChannel() (_ <-chan struct{}, unregister func())
}

// EmptySystemConfigProvider is an implementation of SystemConfigProvider that
// never provides a system config.
type EmptySystemConfigProvider struct{}
// ConstantSystemConfigProvider is an implementation of SystemConfigProvider which
// always returns the same value.
type ConstantSystemConfigProvider struct {
cfg *SystemConfig
}

// NewConstantSystemConfigProvider constructs a SystemConfigProvider which
// always returns the same value.
func NewConstantSystemConfigProvider(cfg *SystemConfig) *ConstantSystemConfigProvider {
p := &ConstantSystemConfigProvider{cfg: cfg}
return p
}

// GetSystemConfig implements the SystemConfigProvider interface.
func (EmptySystemConfigProvider) GetSystemConfig() *SystemConfig {
return nil
func (c *ConstantSystemConfigProvider) GetSystemConfig() *SystemConfig {
return c.cfg
}

// RegisterSystemConfigChannel implements the SystemConfigProvider interface.
func (EmptySystemConfigProvider) RegisterSystemConfigChannel() <-chan struct{} {
func (c *ConstantSystemConfigProvider) RegisterSystemConfigChannel() (
_ <-chan struct{},
unregister func(),
) {
// The system config will never be updated, so return a nil channel.
return nil
return nil, func() {}
}
16 changes: 10 additions & 6 deletions pkg/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func New(

g.mu.Lock()
// Add ourselves as a SystemConfig watcher.
g.mu.is.registerCallback(KeySystemConfig, g.updateSystemConfig)
g.mu.is.registerCallback(KeyDeprecatedSystemConfig, g.updateSystemConfig)
// Add ourselves as a node descriptor watcher.
g.mu.is.registerCallback(MakePrefixPattern(KeyNodeIDPrefix), g.updateNodeAddress)
g.mu.is.registerCallback(MakePrefixPattern(KeyStorePrefix), g.updateStoreMap)
Expand Down Expand Up @@ -1145,18 +1145,22 @@ func (g *Gossip) RegisterCallback(pattern string, method Callback, opts ...Callb
}
}

// GetSystemConfig returns the local unmarshaled version of the system config.
// DeprecatedGetSystemConfig returns the local unmarshaled version of the system config.
// Returns nil if the system config hasn't been set yet.
func (g *Gossip) GetSystemConfig() *config.SystemConfig {
//
// TODO(ajwerner): Remove this in 22.2.
func (g *Gossip) DeprecatedGetSystemConfig() *config.SystemConfig {
g.systemConfigMu.RLock()
defer g.systemConfigMu.RUnlock()
return g.systemConfig
}

// RegisterSystemConfigChannel registers a channel to signify updates for the
// DeprecatedRegisterSystemConfigChannel registers a channel to signify updates for the
// system config. It is notified after registration (if a system config is
// already set), and whenever a new system config is successfully unmarshaled.
func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
//
// TODO(ajwerner): Remove this in 22.2.
func (g *Gossip) DeprecatedRegisterSystemConfigChannel() <-chan struct{} {
// Create channel that receives new system config notifications.
// The channel has a size of 1 to prevent gossip from having to block on it.
c := make(chan struct{}, 1)
Expand All @@ -1177,7 +1181,7 @@ func (g *Gossip) RegisterSystemConfigChannel() <-chan struct{} {
// channel.
func (g *Gossip) updateSystemConfig(key string, content roachpb.Value) {
ctx := g.AnnotateCtx(context.TODO())
if key != KeySystemConfig {
if key != KeyDeprecatedSystemConfig {
log.Fatalf(ctx, "wrong key received on SystemConfig callback: %s", key)
}
cfg := config.NewSystemConfig(g.defaultZoneConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/gossip/infostore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestInfoStoreMostDistant(t *testing.T) {
scInfo := is.newInfo(nil, time.Second)
scInfo.Hops = 100
scInfo.NodeID = nodes[0]
if err := is.addInfo(KeySystemConfig, scInfo); err != nil {
if err := is.addInfo(KeyDeprecatedSystemConfig, scInfo); err != nil {
t.Fatal(err)
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,18 @@ const (
// bi-level key addressing scheme. The value is a roachpb.RangeDescriptor.
KeyFirstRangeDescriptor = "first-range"

// KeySystemConfig is the gossip key for the system DB span.
// KeyDeprecatedSystemConfig is the gossip key for the system DB span.
// The value if a config.SystemConfig which holds all key/value
// pairs in the system DB span.
KeySystemConfig = "system-db"
//
// This key is used in the 21.2<->22.1 mixed version state. It is not used
// in 22.1. However, it was written without a TTL, so there no guarantee
// that it will actually be removed from the gossip network.
//
// TODO(ajwerner): Write a migration to remove the data, or release a
// a version which drops the key entirely, and then, in a subsequent
// release, delete this key.
KeyDeprecatedSystemConfig = "system-db"

// KeyDistSQLNodeVersionKeyPrefix is key prefix for each node's DistSQL
// version.
Expand Down
Loading

0 comments on commit 8878715

Please sign in to comment.