From 8132e00bc7cd09bb346fb36427f16f4fcfeeda3a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 28 Apr 2021 11:27:00 +0100 Subject: [PATCH] changefeedccl: fail changefeeds when tables go offline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In 20.2.4, a changefeed would fail if IMPORT INTO was run against one of its target tables. The failure would look like: ``` I210428 10:45:57.982012 2015 jobs/registry.go:1131 ⋮ [n1] CHANGEFEED job 653840730248282113: stepping through state failed with error: ‹relation› ‹"test"› is offline: ‹importing› (1) Wraps: (2) attached stack trace -- stack trace: | github.com/cockroachdb/cockroach/pkg/sql/catalog.FilterDescriptorState | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/descriptor.go:387 | github.com/cockroachdb/cockroach/pkg/sql/catalog/lease.storage.acquire.func1 | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/lease/lease.go:219 | github.com/cockroachdb/cockroach/pkg/kv.(*DB).Txn.func1 | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/kv/db.go:758 | github.com/cockroachdb/cockroach/pkg/kv.(*Txn).exec | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/kv/txn.go:808 | github.com/cockroachdb/cockroach/pkg/kv.(*DB).Txn | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/kv/db.go:757 | github.com/cockroachdb/cockroach/pkg/sql/catalog/lease.storage.acquire | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/lease/lease.go:193 | github.com/cockroachdb/cockroach/pkg/sql/catalog/lease.acquireNodeLease.func1 | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/lease/lease.go:859 | github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight.(*Group).doCall | /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight/singleflight.go:128 | runtime.goexit | /usr/local/Cellar/go/1.16.3/libexec/src/runtime/asm_amd64.s:1371 Wraps: (3) ‹relation› ‹"test"› is offline: ‹importing› Error types: (1) *catalog.inactiveDescriptorError (2) *withstack.withStack (3) *errutil.leafError ``` We want changefeed to fail when IMPORT INTO is run because changes via the AddSSTable mechanism is not currently reflected in the changefeed, meaning we would fail to emit imported data. The previous path that raised this failure depended on: 1) The descriptor being offline at the point we attempted to acquire a lease on it: https://github.com/cockroachdb/cockroach/blob/d1962910b58005096ce411bccbaddcd0c1d30cbd/pkg/ccl/changefeedccl/schemafeed/schema_feed.go#L514 2) The lease acquisition filtering out offline descriptors with an error: https://github.com/cockroachdb/cockroach/blob/eda2309728392593162e962a61182eab6ab003ff/pkg/sql/catalog/descriptor.go#L209 3) The failure from the lease acquisition in the schemafeed being treated as a fatal error. I believe our behaviour here has changed a few times on both the 20.2 branch and master because of changes in each of these 3 behaviours. In this change, rather than relying on the lease acquisition, we specifically check for offline tables in our ValidateTable function. This function is called for every descriptor version we get from the ExportRequest on the Descriptor table. Currently, I believe that checking for the offline descriptors is correct since it appears that only restore and import put tables into an offline state. Release note (enterprise change): CHANGEFEEDs more reliably fail when IMPORT INTO is run against a targeted table. Fixes #64276 See also #62585, #43784 --- pkg/ccl/changefeedccl/cdctest/testfeed.go | 6 +- pkg/ccl/changefeedccl/changefeed_test.go | 73 ++++++++++++++----- .../changefeedccl/changefeedbase/validate.go | 4 + 3 files changed, 65 insertions(+), 18 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index 1c86fb8f8832..a45a5e30d4d6 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -63,7 +63,11 @@ func (m TestFeedMessage) String() string { return fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value) } -// TestFeed abstracts over reading from the various types of changefeed sinks. +// TestFeed abstracts over reading from the various types of +// changefeed sinks. +// +// TODO(ssd): These functions need to take a context or otherwise +// allow us to time them out safely. type TestFeed interface { // Partitions returns the domain of values that may be returned as a partition // by Next. diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 04235c2d4485..777628af62f4 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -13,6 +13,8 @@ import ( gosql "database/sql" "fmt" "math" + "net/http" + "net/http/httptest" "net/url" "regexp" "sort" @@ -1180,31 +1182,68 @@ func TestChangefeedAuthorization(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) } -func TestChangefeedFailOnRBRChange(t *testing.T) { +func requireErrorSoon( + ctx context.Context, t *testing.T, f cdctest.TestFeed, errRegex *regexp.Regexp, +) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + done := make(chan struct{}) + go func() { + if _, err := f.Next(); err != nil { + assert.Regexp(t, errRegex, err) + done <- struct{}{} + } + }() + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for changefeed to fail") + case <-done: + } +} + +func TestChangefeedFailOnTableOffline(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`) - assertRBRError := func(ctx context.Context, f cdctest.TestFeed) { - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - done := make(chan struct{}) - go func() { - if _, err := f.Next(); err != nil { - assert.Regexp(t, rbrErrorRegex, err) - done <- struct{}{} + dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + if _, err := w.Write([]byte("42,42\n")); err != nil { + t.Logf("failed to write: %s", err.Error()) } - }() - select { - case <-ctx.Done(): - t.Fatal("timed out waiting for changefeed to fail") - case <-done: } + })) + defer dataSrv.Close() + + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'") + t.Run("import fails changefeed", func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE for_import (a INT PRIMARY KEY, b INT)`) + defer sqlDB.Exec(t, `DROP TABLE for_import`) + sqlDB.Exec(t, `INSERT INTO for_import VALUES (0, NULL)`) + forImport := feed(t, f, `CREATE CHANGEFEED FOR for_import `) + defer closeFeed(t, forImport) + assertPayloads(t, forImport, []string{ + `for_import: [0]->{"after": {"a": 0, "b": null}}`, + }) + sqlDB.Exec(t, `IMPORT INTO for_import CSV DATA ($1)`, dataSrv.URL) + requireErrorSoon(context.Background(), t, forImport, + regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`)) + }) } + t.Run(`sinkless`, sinklessTest(testFn)) + t.Run("enterprise", enterpriseTest(testFn)) +} + +func TestChangefeedFailOnRBRChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`) testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'") - t.Run("regional by row", func(t *testing.T) { + t.Run("regional by row change fails changefeed", func(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`) defer sqlDB.Exec(t, `DROP TABLE rbr`) sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`) @@ -1216,7 +1255,7 @@ func TestChangefeedFailOnRBRChange(t *testing.T) { `rbr: [1]->{"after": {"a": 1, "b": 2}}`, }) sqlDB.Exec(t, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`) - assertRBRError(context.Background(), rbr) + requireErrorSoon(context.Background(), t, rbr, rbrErrorRegex) }) } withTestServerRegion := func(args *base.TestServerArgs) { diff --git a/pkg/ccl/changefeedccl/changefeedbase/validate.go b/pkg/ccl/changefeedccl/changefeedbase/validate.go index ce38aa8055f0..883daa30119e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/validate.go +++ b/pkg/ccl/changefeedccl/changefeedbase/validate.go @@ -53,5 +53,9 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc return errors.Errorf(`"%s" was dropped`, t.StatementTimeName) } + if tableDesc.Offline() { + return errors.Errorf("CHANGEFEED cannot target offline table: %s (offline reason: %q)", tableDesc.GetName(), tableDesc.GetOfflineReason()) + } + return nil }