Skip to content

Commit

Permalink
changefeedccl: fail changefeeds when tables go offline
Browse files Browse the repository at this point in the history
In 20.2.4, a changefeed would fail if IMPORT INTO was run against one
of its target tables. The failure would look like:

```
I210428 10:45:57.982012 2015 jobs/registry.go:1131 ⋮ [n1] CHANGEFEED job 653840730248282113: stepping through state failed with error: ‹relation› ‹"test"› is offline: ‹importing›
(1)
Wraps: (2) attached stack trace
  -- stack trace:
  | github.com/cockroachdb/cockroach/pkg/sql/catalog.FilterDescriptorState
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/descriptor.go:387
  | github.com/cockroachdb/cockroach/pkg/sql/catalog/lease.storage.acquire.func1
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/lease/lease.go:219
  | github.com/cockroachdb/cockroach/pkg/kv.(*DB).Txn.func1
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/kv/db.go:758
  | github.com/cockroachdb/cockroach/pkg/kv.(*Txn).exec
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/kv/txn.go:808
  | github.com/cockroachdb/cockroach/pkg/kv.(*DB).Txn
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/kv/db.go:757
  | github.com/cockroachdb/cockroach/pkg/sql/catalog/lease.storage.acquire
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/lease/lease.go:193
  | github.com/cockroachdb/cockroach/pkg/sql/catalog/lease.acquireNodeLease.func1
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/sql/catalog/lease/lease.go:859
  | github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight.(*Group).doCall
  |     /Users/ssd/go/src/github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight/singleflight.go:128
  | runtime.goexit
  |     /usr/local/Cellar/go/1.16.3/libexec/src/runtime/asm_amd64.s:1371
Wraps: (3) ‹relation› ‹"test"› is offline: ‹importing›
Error
types: (1) *catalog.inactiveDescriptorError (2) *withstack.withStack (3) *errutil.leafError
```

We want changefeed to fail when IMPORT INTO is run because changes via
the AddSSTable mechanism is not currently reflected in the changefeed,
meaning we would fail to emit imported data.

The previous path that raised this failure depended on:

1) The descriptor being offline at the point we attempted to acquire a
   lease on it:

   https://github.com/cockroachdb/cockroach/blob/d1962910b58005096ce411bccbaddcd0c1d30cbd/pkg/ccl/changefeedccl/schemafeed/schema_feed.go#L514

2) The lease acquisition filtering out offline descriptors with an
error:

   https://github.com/cockroachdb/cockroach/blob/eda2309728392593162e962a61182eab6ab003ff/pkg/sql/catalog/descriptor.go#L209

3) The failure from the lease acquisition in the schemafeed being
   treated as a fatal error.

I believe our behaviour here has changed a few times on both the 20.2
branch and master because of changes in each of these 3 behaviours.

In this change, rather than relying on the lease acquisition, we
specifically check for offline tables in our ValidateTable
function. This function is called for every descriptor version we get
from the ExportRequest on the Descriptor table.

Currently, I believe that checking for the offline descriptors is
correct since it appears that only restore and import put tables into
an offline state.

Release note (enterprise change): CHANGEFEEDs more reliably fail when
IMPORT INTO is run against a targeted table.

Fixes cockroachdb#64276

See also cockroachdb#62585, cockroachdb#43784
  • Loading branch information
stevendanna committed Apr 28, 2021
1 parent a513986 commit 8132e00
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func (m TestFeedMessage) String() string {
return fmt.Sprintf(`%s: %s->%s`, m.Topic, m.Key, m.Value)
}

// TestFeed abstracts over reading from the various types of changefeed sinks.
// TestFeed abstracts over reading from the various types of
// changefeed sinks.
//
// TODO(ssd): These functions need to take a context or otherwise
// allow us to time them out safely.
type TestFeed interface {
// Partitions returns the domain of values that may be returned as a partition
// by Next.
Expand Down
73 changes: 56 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
gosql "database/sql"
"fmt"
"math"
"net/http"
"net/http/httptest"
"net/url"
"regexp"
"sort"
Expand Down Expand Up @@ -1180,31 +1182,68 @@ func TestChangefeedAuthorization(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedFailOnRBRChange(t *testing.T) {
func requireErrorSoon(
ctx context.Context, t *testing.T, f cdctest.TestFeed, errRegex *regexp.Regexp,
) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
if _, err := f.Next(); err != nil {
assert.Regexp(t, errRegex, err)
done <- struct{}{}
}
}()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for changefeed to fail")
case <-done:
}
}

func TestChangefeedFailOnTableOffline(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`)
assertRBRError := func(ctx context.Context, f cdctest.TestFeed) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
if _, err := f.Next(); err != nil {
assert.Regexp(t, rbrErrorRegex, err)
done <- struct{}{}
dataSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
if _, err := w.Write([]byte("42,42\n")); err != nil {
t.Logf("failed to write: %s", err.Error())
}
}()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for changefeed to fail")
case <-done:
}
}))
defer dataSrv.Close()

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'")
t.Run("import fails changefeed", func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE for_import (a INT PRIMARY KEY, b INT)`)
defer sqlDB.Exec(t, `DROP TABLE for_import`)
sqlDB.Exec(t, `INSERT INTO for_import VALUES (0, NULL)`)
forImport := feed(t, f, `CREATE CHANGEFEED FOR for_import `)
defer closeFeed(t, forImport)
assertPayloads(t, forImport, []string{
`for_import: [0]->{"after": {"a": 0, "b": null}}`,
})
sqlDB.Exec(t, `IMPORT INTO for_import CSV DATA ($1)`, dataSrv.URL)
requireErrorSoon(context.Background(), t, forImport,
regexp.MustCompile(`CHANGEFEED cannot target offline table: for_import \(offline reason: "importing"\)`))
})
}
t.Run(`sinkless`, sinklessTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
}

func TestChangefeedFailOnRBRChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`)
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'")
t.Run("regional by row", func(t *testing.T) {
t.Run("regional by row change fails changefeed", func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`)
defer sqlDB.Exec(t, `DROP TABLE rbr`)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`)
Expand All @@ -1216,7 +1255,7 @@ func TestChangefeedFailOnRBRChange(t *testing.T) {
`rbr: [1]->{"after": {"a": 1, "b": 2}}`,
})
sqlDB.Exec(t, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`)
assertRBRError(context.Background(), rbr)
requireErrorSoon(context.Background(), t, rbr, rbrErrorRegex)
})
}
withTestServerRegion := func(args *base.TestServerArgs) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,9 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc
return errors.Errorf(`"%s" was dropped`, t.StatementTimeName)
}

if tableDesc.Offline() {
return errors.Errorf("CHANGEFEED cannot target offline table: %s (offline reason: %q)", tableDesc.GetName(), tableDesc.GetOfflineReason())
}

return nil
}

0 comments on commit 8132e00

Please sign in to comment.