Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86436: kvserver: incorporate remote tracing spans from snapshots r=AlexTalks a=AlexTalks

This adds collected tracing spans into a `SnapshotResponse` object in
order to incorporate remote traces from the receiver side of a snapshot
into the client's (i.e. the sender's) context.

Release justification: Low-risk observability change.
Release note: None

86676: clusterversion, kvserver: remove SpanConfig related version gates r=celiala a=arulajmani

Remove EnsureSPanConfigReconciliation, EnsureSpanConfigSubscription,
and EnableSpanConfigStore.

References cockroachdb#80663
Subsumes cockroachdb#85848

Release justification: cleanup
Release note: None

Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Arul Ajmani <[email protected]>
3 people committed Aug 25, 2022
3 parents 44e50c1 + 09a1d46 + 0900ea9 commit 9a7ec4b
Showing 12 changed files with 80 additions and 245 deletions.
2 changes: 0 additions & 2 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -58,7 +58,6 @@ go_test(
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
@@ -91,7 +90,6 @@ go_test(
"//pkg/util/stop",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
117 changes: 0 additions & 117 deletions pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go
Original file line number Diff line number Diff line change
@@ -17,24 +17,19 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

@@ -367,115 +362,3 @@ func TestTenantUpgradeFailure(t *testing.T) {
tenantInfo.v2onMigrationStopper.Stop(ctx)
})
}

// TestTenantSystemConfigUpgrade ensures that the tenant GC job uses the
// appropriate view of the GC TTL.
func TestTenantSystemConfigUpgrade(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false, // initializeVersion
)
// Initialize the version to the BinaryMinSupportedVersion.
require.NoError(t, clusterversion.Initialize(ctx,
clusterversion.TestingBinaryMinSupportedVersion, &settings.SV))
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
// Test is designed to run within a tenant. No need
// for the test tenant here.
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion,
},
},
},
})
hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
hostDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
defer tc.Stopper().Stop(ctx)
connectToTenant := func(t *testing.T, addr string) (_ *gosql.DB, cleanup func()) {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, addr, "Tenant", url.User(username.RootUser))
tenantDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
return tenantDB, func() {
tenantDB.Close()
cleanupPGUrl()
}
}
mkTenant := func(t *testing.T, id uint64) (
tenant serverutils.TestTenantInterface,
) {
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false, // initializeVersion
)
// Initialize the version to the minimum it could be.
require.NoError(t, clusterversion.Initialize(ctx,
clusterversion.TestingBinaryMinSupportedVersion, &settings.SV))
tenantArgs := base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(id),
TestingKnobs: base.TestingKnobs{},
Settings: settings,
}
tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs)
require.NoError(t, err)
return tenant
}
const tenantID = 10
codec := keys.MakeSQLCodec(roachpb.MakeTenantID(tenantID))
tenant := mkTenant(t, tenantID)
tenantSQL, cleanup := connectToTenant(t, tenant.SQLAddr())
defer cleanup()
tenantDB := sqlutils.MakeSQLRunner(tenantSQL)
tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{"21.2"}})
tenantDB.Exec(t, "CREATE TABLE foo ()")
fooID := sqlutils.QueryTableID(t, tenantSQL, "defaultdb", "public", "foo")
tenantP := tenant.SystemConfigProvider()
ch, _ := tenantP.RegisterSystemConfigChannel()

hostDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()")
hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 111")
hostDB.Exec(t,
"ALTER TENANT $1 SET CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled = true;",
tenantID)
tenantDB.CheckQueryResultsRetry(
t, "SHOW CLUSTER SETTING sql.zone_configs.allow_for_secondary_tenant.enabled",
[][]string{{"true"}},
)
tenantVersion := func() clusterversion.ClusterVersion {
return tenant.ClusterSettings().Version.ActiveVersionOrEmpty(ctx)
}
checkConfigEqual := func(t *testing.T, exp int32) {
testutils.SucceedsSoon(t, func() error {
cfg := tenantP.GetSystemConfig()
if cfg == nil {
return errors.New("no config")
}
conf, err := tenantP.GetSystemConfig().GetZoneConfigForObject(codec, tenantVersion(), config.ObjectID(fooID))
if err != nil {
return err
}
if conf.GC.TTLSeconds != exp {
return errors.Errorf("got %d, expected %d", conf.GC.TTLSeconds, exp)
}
return nil
})
}
checkConfigEqual(t, 111)
<-ch
hostDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 112")
<-ch
checkConfigEqual(t, 112)
tenantDB.Exec(t, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()")
tenantDB.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 113")
<-ch
checkConfigEqual(t, 113)
}
15 changes: 0 additions & 15 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
@@ -168,13 +168,6 @@ const (
// This version must be active before any ProbeRequest is issued on the
// cluster.
ProbeRequest
// EnsureSpanConfigReconciliation ensures that the host tenant has run its
// reconciliation process at least once.
EnsureSpanConfigReconciliation
// EnsureSpanConfigSubscription ensures that all KV nodes are subscribed to
// the global span configuration state, observing the entries installed as
// in EnsureSpanConfigReconciliation.
EnsureSpanConfigSubscription
// EnableSpanConfigStore enables the use of the span configs infrastructure
// in KV.
EnableSpanConfigStore
@@ -342,14 +335,6 @@ var versionsSingleton = keyedVersions{
Key: ProbeRequest,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 26},
},
{
Key: EnsureSpanConfigReconciliation,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 36},
},
{
Key: EnsureSpanConfigSubscription,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 38},
},
{
Key: EnableSpanConfigStore,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 40},
74 changes: 36 additions & 38 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/config/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/config",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/config/zonepb",
"//pkg/keys",
"//pkg/roachpb",
19 changes: 1 addition & 18 deletions pkg/config/system.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@ import (
"fmt"
"sort"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -402,26 +401,10 @@ func isPseudoTableID(id uint32) bool {
// NOTE: any subzones from the zone placeholder will be automatically merged
// into the cached zone so the caller doesn't need special-case handling code.
func (s *SystemConfig) GetZoneConfigForObject(
codec keys.SQLCodec, version clusterversion.ClusterVersion, id ObjectID,
codec keys.SQLCodec, id ObjectID,
) (*zonepb.ZoneConfig, error) {
var entry zoneEntry
var err error
// In the case that we've not yet ensured reconciliation of the span
// configurations, use the host-provided view of the RANGE tenants
// configuration.
//
// TODO(ajwerner,arulajmani): If the reconciliation protocol is not active,
// and this is a secondary tenant object we're trying to look up, we're in a
// bit of a pickle. This assumes that if we're in the appropriate version,
// then so too is the system tenant and things are reconciled. Is it possible
// that neither of these object IDs represent reality? It seems like after
// the host cluster has been upgraded but the tenants have not, that we're
// in a weird intermediate state whereby the system tenant's config is no
// longer respected, but neither is the secondary tenant's.
if !codec.ForSystemTenant() &&
(id == 0 || !version.IsActive(clusterversion.EnableSpanConfigStore)) {
codec, id = keys.SystemSQLCodec, keys.TenantsRangesID
}
entry, err = s.getZoneEntry(codec, id)
if err != nil {
return nil, err
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
@@ -214,6 +214,9 @@ message SnapshotResponse {
Status status = 1;
string message = 2;
reserved 3;

// Traces from snapshot processing, returned on status APPLIED or ERROR.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 4 [(gogoproto.nullable) = false];
}

// DelegateSnapshotRequest is the request used to delegate send snapshot requests.
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/storage_services.proto
Original file line number Diff line number Diff line change
@@ -17,6 +17,14 @@ import "kv/kvserver/api.proto";

service MultiRaft {
rpc RaftMessageBatch (stream cockroach.kv.kvserver.kvserverpb.RaftMessageRequestBatch) returns (stream cockroach.kv.kvserver.kvserverpb.RaftMessageResponse) {}
// RaftSnapshot asks the server to accept and apply a range snapshot.
// The client is expected to initially send a message consisting solely of
// a Header, upon which the server will respond with a message with status
// ACCEPTED, or ERROR if it cannot accept the snapshot. Once accepted, the
// client will send multiple messages with KVBatch data followed by a
// terminal message with the final flag set to true. Once finalized,
// the server will ultimately send a message back with status APPLIED, or
// ERROR, including any collected traces from processing.
rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {}
// DelegateRaftSnapshot asks the server to send a range snapshot to a target
// (so the client delegates the sending of the snapshot to the server). The
26 changes: 1 addition & 25 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
@@ -2184,29 +2184,6 @@ func (s *Store) GetConfReader(ctx context.Context) (spanconfig.StoreReader, erro
return nil, errSysCfgUnavailable
}

// We need a version gate here before switching over to the span configs
// infrastructure. In a mixed-version cluster we need to wait for
// the host tenant to have fully populated `system.span_configurations`
// (read: reconciled) at least once before using it as a view for all
// split/config decisions.
_ = clusterversion.EnsureSpanConfigReconciliation
//
// We also want to ensure that the KVSubscriber on each store is at least as
// up-to-date as some full reconciliation timestamp.
_ = clusterversion.EnsureSpanConfigSubscription
//
// Without a version gate, it would be possible for a replica on a
// new-binary-server to apply the static fallback config (assuming no
// entries in `system.span_configurations`), in violation of explicit
// configs directly set by the user. Though unlikely, it's also possible for
// us to merge all ranges into a single one -- with no entries in
// system.span_configurations, the infrastructure can erroneously conclude
// that there are zero split points.
//
// We achieve all this through a three-step migration process, culminating
// in the following cluster version gate:
_ = clusterversion.EnableSpanConfigStore

if s.cfg.SpanConfigsDisabled ||
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) ||
s.TestingKnobs().UseSystemConfigSpanForQueues {
@@ -2395,8 +2372,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
}

if s.cfg.SpanConfigsDisabled ||
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) ||
!s.cfg.Settings.Version.IsActive(ctx, clusterversion.EnableSpanConfigStore) {
!spanconfigstore.EnabledSetting.Get(&s.ClusterSettings().SV) {
repl.SetSpanConfig(conf)
}

45 changes: 28 additions & 17 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
@@ -906,6 +907,8 @@ func (s *Store) checkSnapshotOverlapLocked(
func (s *Store) receiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream,
) error {
sp := tracing.SpanFromContext(ctx)

// Draining nodes will generally not be rebalanced to (see the filtering that
// happens in getStoreListFromIDsLocked()), but in case they are, they should
// reject the incoming rebalancing snapshots.
@@ -1028,29 +1031,43 @@ func (s *Store) receiveSnapshot(
s.metrics.RangeSnapshotUnknownRcvdBytes.Inc(inc)
}
}
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
ctx, rSp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "receive snapshot data")
defer rSp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
inSnap, err := ss.Receive(ctx, stream, *header, recordBytesReceived)
sp.Finish() // Ensure that the tracing span is closed, even if ss.Receive errors
if err != nil {
return err
}
inSnap.placeholder = placeholder

rec := sp.GetConfiguredRecording()

// Use a background context for applying the snapshot, as handleRaftReady is
// not prepared to deal with arbitrary context cancellation. Also, we've
// already received the entire snapshot here, so there's no point in
// abandoning application half-way through if the caller goes away.
applyCtx := s.AnnotateCtx(context.Background())
if err := s.processRaftSnapshotRequest(applyCtx, header, inSnap); err != nil {
return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot"))
return sendSnapshotErrorWithTrace(stream,
errors.Wrap(err.GoError(), "failed to apply snapshot"), rec,
)
}
return stream.Send(&kvserverpb.SnapshotResponse{Status: kvserverpb.SnapshotResponse_APPLIED})
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_APPLIED,
CollectedSpans: rec,
})
}

func sendSnapshotError(stream incomingSnapshotStream, err error) error {
return sendSnapshotErrorWithTrace(stream, err, nil /* trace */)
}

func sendSnapshotErrorWithTrace(
stream incomingSnapshotStream, err error, trace tracingpb.Recording,
) error {
return stream.Send(&kvserverpb.SnapshotResponse{
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
Status: kvserverpb.SnapshotResponse_ERROR,
Message: err.Error(),
CollectedSpans: trace,
})
}

@@ -1449,6 +1466,7 @@ func sendSnapshot(
}
switch resp.Status {
case kvserverpb.SnapshotResponse_ERROR:
sp.ImportRemoteRecording(resp.CollectedSpans)
storePool.Throttle(storepool.ThrottleFailed, resp.Message, to.StoreID)
return errors.Errorf("%s: remote couldn't accept %s with error: %s",
to, snap, resp.Message)
@@ -1526,6 +1544,7 @@ func sendSnapshot(
if err != nil {
return errors.Wrapf(err, "%s: remote failed to apply snapshot", to)
}
sp.ImportRemoteRecording(resp.CollectedSpans)
// NB: wait for EOF which ensures that all processing on the server side has
// completed (such as defers that might be run after the previous message was
// received).
@@ -1601,17 +1620,9 @@ func delegateSnapshot(
unexpectedResp,
)
}
// Import the remotely collected spans, if any.
if len(resp.CollectedSpans) != 0 {
span := tracing.SpanFromContext(ctx)
if span == nil {
log.Warningf(
ctx,
"trying to ingest remote spans but there is no recording span set up",
)
} else {
span.ImportRemoteRecording(resp.CollectedSpans)
}
sp := tracing.SpanFromContext(ctx)
if sp != nil {
sp.ImportRemoteRecording(resp.CollectedSpans)
}
switch resp.SnapResponse.Status {
case kvserverpb.SnapshotResponse_ERROR:
6 changes: 2 additions & 4 deletions pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
@@ -108,8 +108,7 @@ func updateStatusForGCElements(
if err != nil {
return err
}
v := execCfg.Settings.Version.ActiveVersionOrEmpty(ctx)
zoneCfg, err := cfg.GetZoneConfigForObject(execCfg.Codec, v, config.ObjectID(tableID))
zoneCfg, err := cfg.GetZoneConfigForObject(execCfg.Codec, config.ObjectID(tableID))
if err != nil {
log.Errorf(ctx, "zone config for desc: %d, err = %+v", tableID, err)
return nil
@@ -462,8 +461,7 @@ func refreshTenant(
tenID := details.Tenant.ID
cfg := execCfg.SystemConfig.GetSystemConfig()
tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds
v := execCfg.Settings.Version.ActiveVersionOrEmpty(ctx)
zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, v, keys.TenantsRangesID)
zoneCfg, err := cfg.GetZoneConfigForObject(keys.SystemSQLCodec, keys.TenantsRangesID)
if err == nil {
tenantTTLSeconds = zoneCfg.GC.TTLSeconds
} else {
9 changes: 1 addition & 8 deletions pkg/sql/opt_catalog.go
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@ import (
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/geo/geoindex"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
@@ -550,7 +549,7 @@ func (oc *optCatalog) getZoneConfig(desc catalog.TableDescriptor) (cat.Zone, err
return emptyZoneConfig, nil
}
zone, err := oc.cfg.GetZoneConfigForObject(
oc.codec(), oc.version(), config.ObjectID(desc.GetID()),
oc.codec(), config.ObjectID(desc.GetID()),
)
if err != nil {
return nil, err
@@ -566,12 +565,6 @@ func (oc *optCatalog) codec() keys.SQLCodec {
return oc.planner.ExecCfg().Codec
}

func (oc *optCatalog) version() clusterversion.ClusterVersion {
return oc.planner.ExecCfg().Settings.Version.ActiveVersionOrEmpty(
oc.planner.EvalContext().Context,
)
}

// optView is a wrapper around catalog.TableDescriptor that implements
// the cat.Object, cat.DataSource, and cat.View interfaces.
type optView struct {

0 comments on commit 9a7ec4b

Please sign in to comment.