Skip to content

Commit

Permalink
streamingccl: fix cross cluster replication bug
Browse files Browse the repository at this point in the history
This change does a couple of things:

- It introduces a cluster setting to enable replication
of span configs from the source, system tenant to the
destination, system tenant. This settings is default true.
This is a new feature in 23.2 and so it seems like a good
idea to have a killswitch.

- It special cases `pgcode.UndefinedFunction` error that
is thrown if the source cluster is on a pre-23.2 version
that does not recognize `crdb_internal.setup_span_configs_stream($1)`.
In case of this error, we simply skip replicating the span
configs. This is meant to be a short term, backportable solution.

Release note (bug fix): fixes a bug where replicating from
a source cluster that is on an older version (<23.2) than the destination
cluster would fail because of an undefined builtin function
  • Loading branch information
adityamaru committed Nov 10, 2023
1 parent 128e987 commit 721641a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 2 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/streamingccl/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,14 @@ var DumpFrontierEntries = settings.RegisterDurationSetting(
0,
settings.NonNegativeDuration,
)

// ReplicateSpanConfigsEnabled controls whether we replicate span
// configurations from the source system tenant to the destination system
// tenant.
var ReplicateSpanConfigsEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"physical_replication.consumer.span_configs.enabled",
"controls whether we replicate span configurations from the source system tenant to the "+
"destination system tenant",
true,
)
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
Expand All @@ -39,6 +40,7 @@ go_library(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgx_v4//:pgx",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ var _ Subscription = (*partitionedStreamSubscription)(nil)

// Subscribe implements the Subscription interface.
func (p *partitionedStreamSubscription) Subscribe(ctx context.Context) error {
ctx, sp := tracing.ChildSpan(ctx, "Subscription.Subscribe")
ctx, sp := tracing.ChildSpan(ctx, "partitionedStreamSubscription.Subscribe")
defer sp.Finish()

defer close(p.eventsChan)
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/streamingccl/streamclient/span_config_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -129,7 +131,7 @@ var _ Subscription = (*spanConfigStreamSubscription)(nil)

// Subscribe implements the Subscription interface.
func (p *spanConfigStreamSubscription) Subscribe(ctx context.Context) error {
ctx, sp := tracing.ChildSpan(ctx, "Subscription.Subscribe")
ctx, sp := tracing.ChildSpan(ctx, "spanConfigStreamSubscription.Subscribe")
defer sp.Finish()

defer close(p.eventsChan)
Expand All @@ -152,6 +154,16 @@ func (p *spanConfigStreamSubscription) Subscribe(ctx context.Context) error {
defer cancelFunc()
rows, err := srcConn.Query(cancelCtx, `SELECT crdb_internal.setup_span_configs_stream($1)`, p.tenantName)
if err != nil {
// TODO(adityamaru): This is a short term fix for https://github.com/cockroachdb/cockroach/issues/113682
// to allow for a < 23.2 source cluster to replicate into a 23.2 destination
// cluster. In the long term we will want to use the source cluster's
// cluster version to gate features on the destination.
if pgErr := (*pgconn.PgError)(nil); errors.As(err, &pgErr) {
if pgcode.MakeCode(pgErr.Code) == pgcode.UndefinedFunction {
log.Warningf(ctx, "source cluster is running a version < 23.2, skipping span config replication: %v", err)
return nil
}
}
return err
}
// For close to return, the ctx passed to the query above must be cancelled.
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ func startDistIngestion(

spanConfigIngestStopper := make(chan struct{})
streamSpanConfigs := func(ctx context.Context) error {
if !streamingccl.ReplicateSpanConfigsEnabled.Get(&execCtx.ExecCfg().Settings.SV) {
log.Warningf(ctx, "span config replication is disabled")
return nil
}
if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.SkipSpanConfigReplication {
return nil
}
Expand Down

0 comments on commit 721641a

Please sign in to comment.