From a79eaf3893f9460d4a6f91e581d1bc4cf6a22a1c Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Wed, 28 Dec 2022 11:34:14 -0500 Subject: [PATCH] changefeedccl: add unordered flag This PR adds the WITH unordered changefeed option, which relaxes our constraints on configuration meant to preserve end-to-end ordering guarantees. Followup PRs will use this in different ways, but this one just removes the requirement to specify a region in gcpubsub. Addresses #80884. Informs #54461. Release note (enterprise change): Changefeeds with the WITH unordered flag may use multiregion Google Cloud pubsub topics. --- pkg/ccl/changefeedccl/changefeed_test.go | 4 +++ .../changefeedccl/changefeedbase/options.go | 30 ++++++++++++++++++- pkg/ccl/changefeedccl/sink.go | 2 +- pkg/ccl/changefeedccl/sink_pubsub.go | 19 +++++++++--- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 593d16301426..05300dedd1ef 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4537,6 +4537,10 @@ func TestChangefeedErrors(t *testing.T) { `kafka://nope`, ) + // Unordered flag required for some options, disallowed for others. + sqlDB.ExpectErr(t, `resolved timestamps cannot be guaranteed to be correct in unordered mode`, `CREATE CHANGEFEED FOR foo WITH resolved, unordered`) + sqlDB.ExpectErr(t, `Use of gcpubsub without specifying a region requires the WITH unordered option.`, `CREATE CHANGEFEED FOR foo INTO "gcpubsub://foo"`) + // The topics option should not be exposed to users since it is used // internally to display topics in the show changefeed jobs query sqlDB.ExpectErr( diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index d3052abd4bd8..3bfafde72e55 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -94,6 +94,7 @@ const ( OptWebhookClientTimeout = `webhook_client_timeout` OptOnError = `on_error` OptMetricsScope = `metrics_label` + OptUnordered = `unordered` OptVirtualColumns = `virtual_columns` OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted` @@ -325,6 +326,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{ OptWebhookClientTimeout: durationOption, OptOnError: enum("pause", "fail"), OptMetricsScope: stringOption, + OptUnordered: flagOption, OptVirtualColumns: enum("omitted", "null"), } @@ -336,7 +338,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope, OptMVCCTimestamps, OptDiff, OptSplitColumnFamilies, OptSchemaChangeEvents, OptSchemaChangePolicy, OptProtectDataFromGCOnPause, OptOnError, - OptInitialScan, OptNoInitialScan, OptInitialScanOnly, + OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered, OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics) // SQLValidOptions is options exclusive to SQL sink @@ -436,6 +438,25 @@ var AlterChangefeedTargetOptions = map[string]OptionPermittedValues{ OptNoInitialScan: flagOption, } +type incompatibleOptions struct { + opt1 string + opt2 string + reason string +} + +func makeInvertedIndex(pairs []incompatibleOptions) map[string][]incompatibleOptions { + m := make(map[string][]incompatibleOptions, len(pairs)*2) + for _, p := range pairs { + m[p.opt1] = append(m[p.opt1], p) + m[p.opt2] = append(m[p.opt2], p) + } + return m +} + +var incompatibleOptionsMap = makeInvertedIndex([]incompatibleOptions{ + {opt1: OptUnordered, opt2: OptResolvedTimestamps, reason: `resolved timestamps cannot be guaranteed to be correct in unordered mode`}, +}) + // MakeStatementOptions wraps and canonicalizes the options we get // from TypeAsStringOpts or the job record. func MakeStatementOptions(opts map[string]string) StatementOptions { @@ -982,6 +1003,13 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool } } } + for o := range s.m { + for _, pair := range incompatibleOptionsMap[o] { + if s.IsSet(pair.opt1) && s.IsSet(pair.opt2) { + return errors.Newf(`%s is not usable with %s because %s`, pair.opt1, pair.opt2, pair.reason) + } + } + } return nil } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index ad64b7e85941..cd8479b2d114 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -213,7 +213,7 @@ func getSink( }) case isPubsubSink(u): // TODO: add metrics to pubsubsink - return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg)) + return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered)) case isCloudStorageSink(u): return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) { // Placeholder id for canary sink diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index bd9bd12249d9..04ea1d841b21 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -36,6 +36,7 @@ const credentialsParam = "CREDENTIALS" const GcpScheme = "gcpubsub" const gcpScope = "https://www.googleapis.com/auth/pubsub" const cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform" +const globalGCPEndpoint = "pubsub.googleapis.com:443" // TODO: make numOfWorkers configurable const numOfWorkers = 128 @@ -78,7 +79,7 @@ type gcpPubsubClient struct { client *pubsub.Client ctx context.Context projectID string - region string + endpoint string topicNamer *TopicNamer url sinkURL @@ -186,6 +187,7 @@ func MakePubsubSink( u *url.URL, encodingOpts changefeedbase.EncodingOptions, targets changefeedbase.Targets, + unordered bool, ) (Sink, error) { pubsubURL := sinkURL{URL: u, q: u.Query()} @@ -226,8 +228,17 @@ func MakePubsubSink( return nil, errors.New("missing project name") } region := pubsubURL.consumeParam(regionParam) + var endpoint string if region == "" { - return nil, errors.New("region query parameter not found") + if unordered { + endpoint = globalGCPEndpoint + } else { + return nil, errors.WithHintf(errors.New("region query parameter not found"), + "Use of gcpubsub without specifying a region requires the WITH %s option.", + changefeedbase.OptUnordered) + } + } else { + endpoint = gcpEndpointForRegion(region) } tn, err := MakeTopicNamer(targets, WithSingleName(pubsubTopicName)) if err != nil { @@ -237,7 +248,7 @@ func MakePubsubSink( topicNamer: tn, ctx: ctx, projectID: projectID, - region: gcpEndpointForRegion(region), + endpoint: endpoint, url: pubsubURL, } p.client = g @@ -512,7 +523,7 @@ func (p *gcpPubsubClient) init() error { p.ctx, p.projectID, creds, - option.WithEndpoint(p.region), + option.WithEndpoint(p.endpoint), ) if err != nil {