From 0900ea9d61052db20c97711fa013b8920d3b6a2e Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Tue, 23 Aug 2022 12:16:17 -0400 Subject: [PATCH 1/2] clusterversion, kvserver: remove SpanConfig related version gates Remove EnsureSPanConfigReconciliation, EnsureSpanConfigSubscription, and EnableSpanConfigStore. References #80663 Subsumes #85848 Release justification: cleanup Release note: None --- pkg/ccl/kvccl/kvtenantccl/BUILD.bazel | 2 - .../kvccl/kvtenantccl/tenant_upgrade_test.go | 117 ------------------ pkg/clusterversion/cockroach_versions.go | 15 --- pkg/clusterversion/key_string.go | 74 ++++++----- pkg/config/BUILD.bazel | 1 - pkg/config/system.go | 19 +-- pkg/kv/kvserver/store.go | 26 +--- pkg/sql/gcjob/refresh_statuses.go | 6 +- pkg/sql/opt_catalog.go | 9 +- 9 files changed, 41 insertions(+), 228 deletions(-) diff --git a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel index 0c59cc6aab9d..3c43bcfcc2a5 100644 --- a/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel +++ b/pkg/ccl/kvccl/kvtenantccl/BUILD.bazel @@ -58,7 +58,6 @@ go_test( "//pkg/config", "//pkg/gossip", "//pkg/jobs", - "//pkg/keys", "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvserver", "//pkg/kv/kvserver/kvserverbase", @@ -91,7 +90,6 @@ go_test( "//pkg/util/stop", "//pkg/util/tracing/tracingpb", "//pkg/util/uuid", - "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go b/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go index cd1bf60af651..13647d164024 100644 --- a/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go @@ -17,16 +17,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/upgrade" @@ -34,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -367,115 +362,3 @@ func TestTenantUpgradeFailure(t *testing.T) { tenantInfo.v2onMigrationStopper.Stop(ctx) }) } - -// TestTenantSystemConfigUpgrade ensures that the tenant GC job uses the -// appropriate view of the GC TTL. -func TestTenantSystemConfigUpgrade(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - ctx := context.Background() - settings := cluster.MakeTestingClusterSettingsWithVersions( - clusterversion.TestingBinaryVersion, - clusterversion.TestingBinaryMinSupportedVersion, - false, // initializeVersion - ) - // Initialize the version to the BinaryMinSupportedVersion. - require.NoError(t, clusterversion.Initialize(ctx, - clusterversion.TestingBinaryMinSupportedVersion, &settings.SV)) - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Settings: settings, - // Test is designed to run within a tenant. No need - // for the test tenant here. - DisableDefaultTestTenant: true, - Knobs: base.TestingKnobs{ - Server: &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion, - }, - }, - }, - }) - hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) - hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) - defer tc.Stopper().Stop(ctx) - connectToTenant := func(t *testing.T, addr string) (_ *gosql.DB, cleanup func()) { - pgURL, cleanupPGUrl := sqlutils.PGUrl(t, addr, "Tenant", url.User(username.RootUser)) - tenantDB, err := gosql.Open("postgres", pgURL.String()) - require.NoError(t, err) - return tenantDB, func() { - tenantDB.Close() - cleanupPGUrl() - } - } - mkTenant := func(t *testing.T, id uint64) ( - tenant serverutils.TestTenantInterface, - ) { - settings := cluster.MakeTestingClusterSettingsWithVersions( - clusterversion.TestingBinaryVersion, - clusterversion.TestingBinaryMinSupportedVersion, - false, // initializeVersion - ) - // Initialize the version to the minimum it could be. - require.NoError(t, clusterversion.Initialize(ctx, - clusterversion.TestingBinaryMinSupportedVersion, &settings.SV)) - tenantArgs := base.TestTenantArgs{ - TenantID: roachpb.MakeTenantID(id), - TestingKnobs: base.TestingKnobs{}, - Settings: settings, - } - tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs) - require.NoError(t, err) - return tenant - } - const tenantID = 10 - codec := keys.MakeSQLCodec(roachpb.MakeTenantID(tenantID)) - tenant := mkTenant(t, tenantID) - tenantSQL, cleanup := connectToTenant(t, tenant.SQLAddr()) - defer cleanup() - tenantDB := sqlutils.MakeSQLRunner(tenantSQL) - tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{"21.2"}}) - tenantDB.Exec(t, "CREATE TABLE foo ()") - fooID := sqlutils.QueryTableID(t, tenantSQL, "defaultdb", "public", "foo") - tenantP := tenant.SystemConfigProvider() - ch, _ := tenantP.RegisterSystemConfigChannel() - - hostDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()") - hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 111") - hostDB.Exec(t, - "ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true;", - tenantID) - tenantDB.CheckQueryResultsRetry( - t, "SHOW CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled", - [][]string{{"true"}}, - ) - tenantVersion := func() clusterversion.ClusterVersion { - return tenant.ClusterSettings().Version.ActiveVersionOrEmpty(ctx) - } - checkConfigEqual := func(t *testing.T, exp int32) { - testutils.SucceedsSoon(t, func() error { - cfg := tenantP.GetSystemConfig() - if cfg == nil { - return errors.New("no config") - } - conf, err := tenantP.GetSystemConfig().GetZoneConfigForObject(codec, tenantVersion(), config.ObjectID(fooID)) - if err != nil { - return err - } - if conf.GC.TTLSeconds != exp { - return errors.Errorf("got %d, expected %d", conf.GC.TTLSeconds, exp) - } - return nil - }) - } - checkConfigEqual(t, 111) - <-ch - hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 112") - <-ch - checkConfigEqual(t, 112) - tenantDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()") - tenantDB.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 113") - <-ch - checkConfigEqual(t, 113) -} diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index fece89978e08..0df42e33dbab 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -168,13 +168,6 @@ const ( // This version must be active before any ProbeRequest is issued on the // cluster. ProbeRequest - // EnsureSpanConfigReconciliation ensures that the host tenant has run its - // reconciliation process at least once. - EnsureSpanConfigReconciliation - // EnsureSpanConfigSubscription ensures that all KV nodes are subscribed to - // the global span configuration state, observing the entries installed as - // in EnsureSpanConfigReconciliation. - EnsureSpanConfigSubscription // EnableSpanConfigStore enables the use of the span configs infrastructure // in KV. EnableSpanConfigStore @@ -342,14 +335,6 @@ var versionsSingleton = keyedVersions{ Key: ProbeRequest, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 26}, }, - { - Key: EnsureSpanConfigReconciliation, - Version: roachpb.Version{Major: 21, Minor: 2, Internal: 36}, - }, - { - Key: EnsureSpanConfigSubscription, - Version: roachpb.Version{Major: 21, Minor: 2, Internal: 38}, - }, { Key: EnableSpanConfigStore, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 40}, diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 4dcc01efae23..540216642d36 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -11,47 +11,45 @@ func _() { _ = x[V21_2-0] _ = x[Start22_1-1] _ = x[ProbeRequest-2] - _ = x[EnsureSpanConfigReconciliation-3] - _ = x[EnsureSpanConfigSubscription-4] - _ = x[EnableSpanConfigStore-5] - _ = x[EnableNewStoreRebalancer-6] - _ = x[V22_1-7] - _ = x[Start22_2-8] - _ = x[LocalTimestamps-9] - _ = x[PebbleFormatSplitUserKeysMarkedCompacted-10] - _ = x[EnsurePebbleFormatVersionRangeKeys-11] - _ = x[EnablePebbleFormatVersionRangeKeys-12] - _ = x[TrigramInvertedIndexes-13] - _ = x[RemoveGrantPrivilege-14] - _ = x[MVCCRangeTombstones-15] - _ = x[UpgradeSequenceToBeReferencedByID-16] - _ = x[SampledStmtDiagReqs-17] - _ = x[AddSSTableTombstones-18] - _ = x[SystemPrivilegesTable-19] - _ = x[EnablePredicateProjectionChangefeed-20] - _ = x[AlterSystemSQLInstancesAddLocality-21] - _ = x[SystemExternalConnectionsTable-22] - _ = x[AlterSystemStatementStatisticsAddIndexRecommendations-23] - _ = x[RoleIDSequence-24] - _ = x[AddSystemUserIDColumn-25] - _ = x[SystemUsersIDColumnIsBackfilled-26] - _ = x[SetSystemUsersUserIDColumnNotNull-27] - _ = x[SQLSchemaTelemetryScheduledJobs-28] - _ = x[SchemaChangeSupportsCreateFunction-29] - _ = x[DeleteRequestReturnKey-30] - _ = x[PebbleFormatPrePebblev1Marked-31] - _ = x[RoleOptionsTableHasIDColumn-32] - _ = x[RoleOptionsIDColumnIsBackfilled-33] - _ = x[SetRoleOptionsUserIDColumnNotNull-34] - _ = x[UseDelRangeInGCJob-35] - _ = x[WaitedForDelRangeInGCJob-36] - _ = x[RangefeedUseOneStreamPerNode-37] - _ = x[NoNonMVCCAddSSTable-38] + _ = x[EnableSpanConfigStore-3] + _ = x[EnableNewStoreRebalancer-4] + _ = x[V22_1-5] + _ = x[Start22_2-6] + _ = x[LocalTimestamps-7] + _ = x[PebbleFormatSplitUserKeysMarkedCompacted-8] + _ = x[EnsurePebbleFormatVersionRangeKeys-9] + _ = x[EnablePebbleFormatVersionRangeKeys-10] + _ = x[TrigramInvertedIndexes-11] + _ = x[RemoveGrantPrivilege-12] + _ = x[MVCCRangeTombstones-13] + _ = x[UpgradeSequenceToBeReferencedByID-14] + _ = x[SampledStmtDiagReqs-15] + _ = x[AddSSTableTombstones-16] + _ = x[SystemPrivilegesTable-17] + _ = x[EnablePredicateProjectionChangefeed-18] + _ = x[AlterSystemSQLInstancesAddLocality-19] + _ = x[SystemExternalConnectionsTable-20] + _ = x[AlterSystemStatementStatisticsAddIndexRecommendations-21] + _ = x[RoleIDSequence-22] + _ = x[AddSystemUserIDColumn-23] + _ = x[SystemUsersIDColumnIsBackfilled-24] + _ = x[SetSystemUsersUserIDColumnNotNull-25] + _ = x[SQLSchemaTelemetryScheduledJobs-26] + _ = x[SchemaChangeSupportsCreateFunction-27] + _ = x[DeleteRequestReturnKey-28] + _ = x[PebbleFormatPrePebblev1Marked-29] + _ = x[RoleOptionsTableHasIDColumn-30] + _ = x[RoleOptionsIDColumnIsBackfilled-31] + _ = x[SetRoleOptionsUserIDColumnNotNull-32] + _ = x[UseDelRangeInGCJob-33] + _ = x[WaitedForDelRangeInGCJob-34] + _ = x[RangefeedUseOneStreamPerNode-35] + _ = x[NoNonMVCCAddSSTable-36] } -const _Key_name = "V21_2Start22_1ProbeRequestEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreEnableNewStoreRebalancerV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTable" +const _Key_name = "V21_2Start22_1ProbeRequestEnableSpanConfigStoreEnableNewStoreRebalancerV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTable" -var _Key_index = [...]uint16{0, 5, 14, 26, 56, 84, 105, 129, 134, 143, 158, 198, 232, 266, 288, 308, 327, 360, 379, 399, 420, 455, 489, 519, 572, 586, 607, 638, 671, 702, 736, 758, 787, 814, 845, 878, 896, 920, 948, 967} +var _Key_index = [...]uint16{0, 5, 14, 26, 47, 71, 76, 85, 100, 140, 174, 208, 230, 250, 269, 302, 321, 341, 362, 397, 431, 461, 514, 528, 549, 580, 613, 644, 678, 700, 729, 756, 787, 820, 838, 862, 890, 909} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/config/BUILD.bazel b/pkg/config/BUILD.bazel index d5f1c0a7dd0c..627af4ce31b8 100644 --- a/pkg/config/BUILD.bazel +++ b/pkg/config/BUILD.bazel @@ -17,7 +17,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/config", visibility = ["//visibility:public"], deps = [ - "//pkg/clusterversion", "//pkg/config/zonepb", "//pkg/keys", "//pkg/roachpb", diff --git a/pkg/config/system.go b/pkg/config/system.go index 668be2b6afc3..1c40610a1107 100644 --- a/pkg/config/system.go +++ b/pkg/config/system.go @@ -16,7 +16,6 @@ import ( "fmt" "sort" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -402,26 +401,10 @@ func isPseudoTableID(id uint32) bool { // NOTE: any subzones from the zone placeholder will be automatically merged // into the cached zone so the caller doesn't need special-case handling code. func (s *SystemConfig) GetZoneConfigForObject( - codec keys.SQLCodec, version clusterversion.ClusterVersion, id ObjectID, + codec keys.SQLCodec, id ObjectID, ) (*zonepb.ZoneConfig, error) { var entry zoneEntry var err error - // In the case that we've not yet ensured reconciliation of the span - // configurations, use the host-provided view of the RANGE tenants - // configuration. - // - // TODO(ajwerner,arulajmani): If the reconciliation protocol is not active, - // and this is a secondary tenant object we're trying to look up, we're in a - // bit of a pickle. This assumes that if we're in the appropriate version, - // then so too is the system tenant and things are reconciled. Is it possible - // that neither of these object IDs represent reality? It seems like after - // the host cluster has been upgraded but the tenants have not, that we're - // in a weird intermediate state whereby the system tenant's config is no - // longer respected, but neither is the secondary tenant's. - if !codec.ForSystemTenant() && - (id == 0 || !version.IsActive(clusterversion.EnableSpanConfigStore)) { - codec, id = keys.SystemSQLCodec, keys.TenantsRangesID - } entry, err = s.getZoneEntry(codec, id) if err != nil { return nil, err diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b11490877a89..53e6d3015f9d 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2184,29 +2184,6 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro return nil, errSysCfgUnavailable } - // We need a version gate here before switching over to the span configs - // infrastructure. In a mixed-version cluster we need to wait for - // the host tenant to have fully populated `system.span_configurations` - // (read: reconciled) at least once before using it as a view for all - // split/config decisions. - _ = clusterversion.EnsureSpanConfigReconciliation - // - // We also want to ensure that the KVSubscriber on each store is at least as - // up-to-date as some full reconciliation timestamp. - _ = clusterversion.EnsureSpanConfigSubscription - // - // Without a version gate, it would be possible for a replica on a - // new-binary-server to apply the static fallback config (assuming no - // entries in `system.span_configurations`), in violation of explicit - // configs directly set by the user. Though unlikely, it's also possible for - // us to merge all ranges into a single one -- with no entries in - // system.span_configurations, the infrastructure can erroneously conclude - // that there are zero split points. - // - // We achieve all this through a three-step migration process, culminating - // in the following cluster version gate: - _ = clusterversion.EnableSpanConfigStore - if s.cfg.SpanConfigsDisabled || !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) || s.TestingKnobs().UseSystemConfigSpanForQueues { @@ -2395,8 +2372,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { } if s.cfg.SpanConfigsDisabled || - !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) || - !s.cfg.Settings.Version.IsActive(ctx, clusterversion.EnableSpanConfigStore) { + !spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) { repl.SetSpanConfig(conf) } diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index ba555f6f3558..b4f29361de5a 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -108,8 +108,7 @@ func updateStatusForGCElements( if err != nil { return err } - v := execCfg.Settings.Version.ActiveVersionOrEmpty(ctx) - zoneCfg, err := cfg.GetZoneConfigForObject(execCfg.Codec, v, config.ObjectID(tableID)) + zoneCfg, err := cfg.GetZoneConfigForObject(execCfg.Codec, config.ObjectID(tableID)) if err != nil { log.Errorf(ctx, "zone config for desc: %d, err = %+v", tableID, err) return nil @@ -462,8 +461,7 @@ func refreshTenant( tenID := details.Tenant.ID cfg := execCfg.SystemConfig.GetSystemConfig() tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds - v := execCfg.Settings.Version.ActiveVersionOrEmpty(ctx) - zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, v, keys.TenantsRangesID) + zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, keys.TenantsRangesID) if err == nil { tenantTTLSeconds = zoneCfg.GC.TTLSeconds } else { diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index 6af95fb072ed..0eb86fa7e540 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -15,7 +15,6 @@ import ( "math" "time" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/geo/geoindex" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -550,7 +549,7 @@ func (oc *optCatalog) getZoneConfig(desc catalog.TableDescriptor) (cat.Zone, err return emptyZoneConfig, nil } zone, err := oc.cfg.GetZoneConfigForObject( - oc.codec(), oc.version(), config.ObjectID(desc.GetID()), + oc.codec(), config.ObjectID(desc.GetID()), ) if err != nil { return nil, err @@ -566,12 +565,6 @@ func (oc *optCatalog) codec() keys.SQLCodec { return oc.planner.ExecCfg().Codec } -func (oc *optCatalog) version() clusterversion.ClusterVersion { - return oc.planner.ExecCfg().Settings.Version.ActiveVersionOrEmpty( - oc.planner.EvalContext().Context, - ) -} - // optView is a wrapper around catalog.TableDescriptor that implements // the cat.Object, cat.DataSource, and cat.View interfaces. type optView struct { From 09a1d4639d4a97ca36ae6e9b1d651a1f99e6d058 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Fri, 19 Aug 2022 02:46:27 -0400 Subject: [PATCH 2/2] kvserver: incorporate remote tracing spans from snapshots This adds collected tracing spans into a `SnapshotResponse` object in order to incorporate remote traces from the receiver side of a snapshot into the client's (i.e. the sender's) context. Release justification: Low-risk observability change. Release note: None --- pkg/kv/kvserver/kvserverpb/raft.proto | 3 ++ pkg/kv/kvserver/storage_services.proto | 8 +++++ pkg/kv/kvserver/store_snapshot.go | 45 ++++++++++++++++---------- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index f4db78b6a616..1b02b41d473b 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -214,6 +214,9 @@ message SnapshotResponse { Status status = 1; string message = 2; reserved 3; + + // Traces from snapshot processing, returned on status APPLIED or ERROR. + repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false]; } // DelegateSnapshotRequest is the request used to delegate send snapshot requests. diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index 0cdfbdd0002c..a876b768dbeb 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -17,6 +17,14 @@ import "kv/kvserver/api.proto"; service MultiRaft { rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {} + // RaftSnapshot asks the server to accept and apply a range snapshot. + // The client is expected to initially send a message consisting solely of + // a Header, upon which the server will respond with a message with status + // ACCEPTED, or ERROR if it cannot accept the snapshot. Once accepted, the + // client will send multiple messages with KVBatch data followed by a + // terminal message with the final flag set to true. Once finalized, + // the server will ultimately send a message back with status APPLIED, or + // ERROR, including any collected traces from processing. rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} // DelegateRaftSnapshot asks the server to send a range snapshot to a target // (so the client delegates the sending of the snapshot to the server). The diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 7803070a5f62..5f9082ae3e39 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -906,6 +907,8 @@ func (s *Store) checkSnapshotOverlapLocked( func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, ) error { + sp := tracing.SpanFromContext(ctx) + // Draining nodes will generally not be rebalanced to (see the filtering that // happens in getStoreListFromIDsLocked()), but in case they are, they should // reject the incoming rebalancing snapshots. @@ -1028,29 +1031,43 @@ func (s *Store) receiveSnapshot( s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc) } } - ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") + ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data") + defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived) - sp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors if err != nil { return err } inSnap.placeholder = placeholder + rec := sp.GetConfiguredRecording() + // Use a background context for applying the snapshot, as handleRaftReady is // not prepared to deal with arbitrary context cancellation. Also, we've // already received the entire snapshot here, so there's no point in // abandoning application half-way through if the caller goes away. applyCtx := s.AnnotateCtx(context.Background()) if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil { - return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + return sendSnapshotErrorWithTrace(stream, + errors.Wrap(err.GoError(), "failed to apply snapshot"), rec, + ) } - return stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_APPLIED}) + return stream.Send(&kvserverpb.SnapshotResponse{ + Status: kvserverpb.SnapshotResponse_APPLIED, + CollectedSpans: rec, + }) } func sendSnapshotError(stream incomingSnapshotStream, err error) error { + return sendSnapshotErrorWithTrace(stream, err, nil /* trace */) +} + +func sendSnapshotErrorWithTrace( + stream incomingSnapshotStream, err error, trace tracingpb.Recording, +) error { return stream.Send(&kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ERROR, - Message: err.Error(), + Status: kvserverpb.SnapshotResponse_ERROR, + Message: err.Error(), + CollectedSpans: trace, }) } @@ -1449,6 +1466,7 @@ func sendSnapshot( } switch resp.Status { case kvserverpb.SnapshotResponse_ERROR: + sp.ImportRemoteRecording(resp.CollectedSpans) storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID) return errors.Errorf("%s: remote couldn't accept %s with error: %s", to, snap, resp.Message) @@ -1526,6 +1544,7 @@ func sendSnapshot( if err != nil { return errors.Wrapf(err, "%s: remote failed to apply snapshot", to) } + sp.ImportRemoteRecording(resp.CollectedSpans) // NB: wait for EOF which ensures that all processing on the server side has // completed (such as defers that might be run after the previous message was // received). @@ -1601,17 +1620,9 @@ func delegateSnapshot( unexpectedResp, ) } - // Import the remotely collected spans, if any. - if len(resp.CollectedSpans) != 0 { - span := tracing.SpanFromContext(ctx) - if span == nil { - log.Warningf( - ctx, - "trying to ingest remote spans but there is no recording span set up", - ) - } else { - span.ImportRemoteRecording(resp.CollectedSpans) - } + sp := tracing.SpanFromContext(ctx) + if sp != nil { + sp.ImportRemoteRecording(resp.CollectedSpans) } switch resp.SnapResponse.Status { case kvserverpb.SnapshotResponse_ERROR: