Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
30566: sql: asynchronously drop non-interleaved indexes r=eriktrinh a=eriktrinh

This change drops non-interleaved indexes asynchronously by performing
the deletion of data using an asynchronous schema changer. This is in
preparation to eventually remove index data using `ClearRange` after the
GC TTL period has passed. The initial schema changer runs through the
state machine but does not perform the deletion of index data. Instead
the mutation is moved to a separate list and has a timestamp attached.
The created asynchronous schema changer uses the timestamp and index's
configured GC TTL value to determine when it should begin execution and
actually truncate the index.

When the async schema changer deletes the index data two things occur:
the job is marked as succeeded and the index zone config is removed.

The job can immediately be marked as succeeded because currently a
separate job is created for each index that is dropped.

Interleaved indexes are unaffected and have their data deleted
immediately.

Related to #20696

Fixes #28859.

31020: cdc: Test for falling behind schema TTL r=danhhz a=mrtracy

Add a test that ensures that changefeeds properly exit if they fall far
enough behind that schema information has been lost due to the GC TTL
(that is, a historical row version can no longer be read because the
schema at its timestamp has been garbage collected).

I have also discovered why the sister test (for the table TTL, not the
schema) required a 3 second sleep: the GC queue enforces that replicas
must have an appropriately high "score" before being GCed, even when the
"shouldQueue" process is skipped. To get around this, I have changed
"ManuallyEnqueueSpan" to a more explicit "ManuallyGCSpan", which
directly calls the processing implementation of the gcQueue on the
appropriate replicas. Both that sister test, and the new schema TTL
test, now only require a more predictable 1 second sleep.

Resolves #28644

Release note: None

31152: changefeedccl: fix TestAvroSchema/DECIMAL flake r=mrtracy a=danhhz

The precision is really meant to be in [1,10], but it sure looks like
there's an off by one error in the avro library that makes this test
flake if it picks precision of 1.

Release note: None

31154: kubernetes: Add multiregion channel, add channel to daemonset configs r=a-robinson a=a-robinson

Release note: None

Fixes #31144

Co-authored-by: Erik Trinh <[email protected]>
Co-authored-by: Matt Tracy <[email protected]>
Co-authored-by: Daniel Harrison <[email protected]>
Co-authored-by: Alex Robinson <[email protected]>
  • Loading branch information
5 people committed Oct 9, 2018
5 parents b494a48 + dc79d80 + 4cab780 + 004c6cb + efde28f commit 6b912fa
Show file tree
Hide file tree
Showing 22 changed files with 1,138 additions and 409 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ spec:
mountPath: /cockroach/cockroach-certs
env:
- name: COCKROACH_CHANNEL
value: kubernetes-secure
value: kubernetes-multiregion
command:
- "/bin/bash"
- "-ecx"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ spec:
volumeMounts:
- name: datadir
mountPath: /cockroach/cockroach-data
env:
- name: COCKROACH_CHANNEL
value: kubernetes-insecure
command:
- "/bin/bash"
- "-ecx"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ spec:
mountPath: /cockroach/cockroach-data
- name: certs
mountPath: /cockroach/cockroach-certs
env:
- name: COCKROACH_CHANNEL
value: kubernetes-secure
command:
- "/bin/bash"
- "-ecx"
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ func TestAvroSchema(t *testing.T) {
datum = tree.MakeDTimestamp(t, time.Microsecond)
case sqlbase.ColumnType_DECIMAL:
// TODO(dan): Make RandDatum respect Precision and Width instead.
typ.Precision = rng.Int31n(10) + 1
// TODO(dan): The precision is really meant to be in [1,10], but it
// sure looks like there's an off by one error in the avro library
// that makes this test flake if it picks precision of 1.
typ.Precision = rng.Int31n(10) + 2
typ.Width = rng.Int31n(typ.Precision + 1)
coeff := rng.Int63n(int64(math.Pow10(int(typ.Precision))))
datum = &tree.DDecimal{Decimal: *apd.New(coeff, -typ.Width)}
Expand Down
104 changes: 76 additions & 28 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -890,9 +887,6 @@ func TestChangefeedDataTTL(t *testing.T) {
// versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

// Set a one second GC time; all historical rows subject to GC ASAP.
sqlDB.Exec(t, `ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = $1`, 1)

counter := 0
upsertRow := func() {
counter++
Expand All @@ -916,32 +910,87 @@ func TestChangefeedDataTTL(t *testing.T) {
upsertRow()
upsertRow()

// TODO(mrtracy): Even though the GC TTL on the table is set to 1 second,
// this does not work at 1 or even 2 seconds. Investigate why this is the
// case.
time.Sleep(3 * time.Second)

// Force a GC of the table. This should cause both older versions of the
// table to be deleted, with the middle version being lost to the changefeed.
tblID, err := sqlutils.QueryTableID(sqlDB.DB, "d", "foo")
if err != nil {
t.Fatal(err)
forceTableGC(t, f.Server(), sqlDB, "d", "foo")

// Resume our changefeed normally.
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}

// Verify that the third call to Next() returns an error (the first is the
// initial row, the second is the first change. The third should detect the
// GC interval mismatch).
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
if err := dataExpiredRows.Err(); !testutils.IsError(err, `must be after replica GC threshold`) {
t.Errorf(`expected "must be after replica GC threshold" error got: %+v`, err)
}
}

tablePrefix := keys.MakeTablePrefix(tblID)
tableStartKey := roachpb.RKey(tablePrefix)
tableSpan := roachpb.RSpan{
Key: tableStartKey,
EndKey: tableStartKey.PrefixEnd(),
t.Run("sinkless", enterpriseTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
}

// TestChangefeedSchemaTTL ensures that changefeeds fail with an error in the case
// where the feed has fallen behind the GC TTL of the table's schema.
func TestChangefeedSchemaTTL(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// Set a very simple channel-based, wait-and-resume function as the
// BeforeEmitRow hook.
var shouldWait int32
wait := make(chan struct{})
resume := make(chan struct{})
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.BeforeEmitRow = func() error {
if atomic.LoadInt32(&shouldWait) == 0 {
return nil
}
wait <- struct{}{}
<-resume
return nil
}

ts := f.Server().(*server.TestServer)
if err := ts.GetStores().(*storage.Stores).VisitStores(func(st *storage.Store) error {
return st.ManuallyEnqueueSpan(context.Background(), "gc", tableSpan, true /* skipShouldQueue */)
}); err != nil {
t.Fatal(err)
sqlDB := sqlutils.MakeSQLRunner(db)

// Create the data table; it will only contain a single row with multiple
// versions.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)

counter := 0
upsertRow := func() {
counter++
sqlDB.Exec(t, `UPSERT INTO foo (a, b) VALUES (1, $1)`, fmt.Sprintf("version %d", counter))
}

// Create the initial version of the row and the changefeed itself. The initial
// version is necessary to prevent CREATE CHANGEFEED itself from hanging.
upsertRow()
dataExpiredRows := f.Feed(t, "CREATE CHANGEFEED FOR TABLE foo")
defer dataExpiredRows.Close(t)

// Set up our emit trap and update the row, which will allow us to "pause" the
// changefeed in order to force a GC.
atomic.StoreInt32(&shouldWait, 1)
upsertRow()
<-wait

// Upsert two additional versions. One of these will be deleted by the GC
// process before changefeed polling is resumed.
waitForSchemaChange(t, sqlDB, "ALTER TABLE foo ADD COLUMN c STRING")
upsertRow()
waitForSchemaChange(t, sqlDB, "ALTER TABLE foo ADD COLUMN d STRING")
upsertRow()

// Force a GC of the table. This should cause both older versions of the
// table to be deleted, with the middle version being lost to the changefeed.
forceTableGC(t, f.Server(), sqlDB, "system", "descriptor")

// Resume our changefeed normally.
atomic.StoreInt32(&shouldWait, 0)
resume <- struct{}{}
Expand All @@ -952,13 +1001,12 @@ func TestChangefeedDataTTL(t *testing.T) {
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
_, _, _, _, _, _ = dataExpiredRows.Next(t)
if err := dataExpiredRows.Err(); !testutils.IsError(err, `must be after replica GC threshold`) {
t.Errorf(`expected "must be after replica GC threshold" error got: %+v`, err)
if err := dataExpiredRows.Err(); !testutils.IsError(err, `GC threshold`) {
t.Errorf(`expected "GC threshold" error got: %+v`, err)
}
}

// Due to the minimum 3 second run time (due to needing to wait that long for
// rows to be properly GCed), only run an enterprise test.
t.Run("sinkless", enterpriseTest(testFn))
t.Run("enterprise", enterpriseTest(testFn))
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -525,6 +528,25 @@ func (c *tableFeed) Close(t testing.TB) {
c.urlCleanup()
}

func waitForSchemaChange(
t testing.TB, sqlDB *sqlutils.SQLRunner, stmt string, arguments ...interface{},
) {
sqlDB.Exec(t, stmt, arguments...)
row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1")
var jobID string
row.Scan(&jobID)

testutils.SucceedsSoon(t, func() error {
row := sqlDB.QueryRow(t, "SELECT status FROM [SHOW JOBS] WHERE job_id = $1", jobID)
var status string
row.Scan(&status)
if status != "succeeded" {
return fmt.Errorf("Job %s had status %s, wanted 'succeeded'", jobID, status)
}
return nil
})
}

func assertPayloads(t testing.TB, f testfeed, expected []string) {
t.Helper()

Expand Down Expand Up @@ -674,3 +696,28 @@ func enterpriseTest(testFn func(*testing.T, *gosql.DB, testfeedFactory)) func(*t
testFn(t, db, f)
}
}

func forceTableGC(
t testing.TB,
tsi serverutils.TestServerInterface,
sqlDB *sqlutils.SQLRunner,
database, table string,
) {
t.Helper()
tblID, err := sqlutils.QueryTableID(sqlDB.DB, database, table)
if err != nil {
t.Fatal(err)
}

tblKey := roachpb.Key(keys.MakeTablePrefix(tblID))
gcr := roachpb.GCRequest{
RequestHeader: roachpb.RequestHeader{
Key: tblKey,
EndKey: tblKey.PrefixEnd(),
},
Threshold: tsi.Clock().Now(),
}
if _, err := client.SendWrapped(context.Background(), tsi.DistSender(), &gcr); err != nil {
t.Fatal(err)
}
}
90 changes: 78 additions & 12 deletions pkg/ccl/partitionccl/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,46 @@ import (
"context"
"testing"

"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
)

func subzoneExists(cfg *config.ZoneConfig, index uint32, partition string) bool {
for _, s := range cfg.Subzones {
if s.IndexID == index && s.PartitionName == partition {
return true
}
}
return false
}

func TestDropIndexWithZoneConfigCCL(t *testing.T) {
defer leaktest.AfterTest(t)()

const numRows = 100

asyncNotification := make(chan struct{})

params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
AsyncExecNotification: func() error {
<-asyncNotification
return nil
},
AsyncExecQuickly: true,
},
}
s, sqlDBRaw, kvDB := serverutils.StartServer(t, params)
sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw)
defer s.Stopper().Stop(context.Background())
Expand All @@ -47,30 +74,69 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) {

// Set zone configs on the primary index, secondary index, and one partition
// of the secondary index.
ttlYaml := "gc: {ttlseconds: 1}"
sqlutils.SetZoneConfig(t, sqlDB, "INDEX t.kv@primary", "")
sqlutils.SetZoneConfig(t, sqlDB, "INDEX t.kv@i", "")
sqlutils.SetZoneConfig(t, sqlDB, "PARTITION p2 OF TABLE t.kv", "")
for _, target := range []string{"t.kv@primary", "t.kv@i", "t.kv.p2"} {
if exists := sqlutils.ZoneConfigExists(t, sqlDB, target); !exists {
t.Fatalf(`zone config for %s does not exist`, target)
}
}
sqlutils.SetZoneConfig(t, sqlDB, "INDEX t.kv@i", ttlYaml)
sqlutils.SetZoneConfig(t, sqlDB, "PARTITION p2 OF TABLE t.kv", ttlYaml)

// Drop the index and verify that the zone config for the secondary index and
// its partition are removed but the zone config for the primary index
// remains.
sqlDB.Exec(t, `DROP INDEX t.kv@i`)
tests.CheckKeyCount(t, kvDB, indexSpan, 0)
// All zone configs should still exist.
var buf []byte
cfg := &config.ZoneConfig{}
sqlDB.QueryRow(t, "SELECT config FROM system.zones WHERE id = $1", tableDesc.ID).Scan(&buf)
if err := protoutil.Unmarshal(buf, cfg); err != nil {
t.Fatal(err)
}

subzones := []struct {
index uint32
partition string
}{
{1, ""},
{3, ""},
{3, "p2"},
}
for _, target := range subzones {
if exists := subzoneExists(cfg, target.index, target.partition); !exists {
t.Fatalf(`zone config for %v does not exist`, target)
}
}
// Dropped indexes waiting for GC shouldn't have their zone configs be visible
// using SHOW ZONE CONFIGURATIONS ..., but still need to exist in system.zones.
for _, target := range []string{"t.kv@i", "t.kv.p2"} {
if exists := sqlutils.ZoneConfigExists(t, sqlDB, target); exists {
t.Fatalf(`zone config for %s still exists`, target)
}
}
tableDesc = sqlbase.GetTableDescriptor(kvDB, "t", "kv")
if _, _, err := tableDesc.FindIndexByName("i"); err == nil {
t.Fatalf("table descriptor still contains index after index is dropped")
}
if exists := sqlutils.ZoneConfigExists(t, sqlDB, "t.kv@primary"); !exists {
close(asyncNotification)

// Wait for index drop to complete so zone configs are updated.
testutils.SucceedsSoon(t, func() error {
if kvs, err := kvDB.Scan(context.TODO(), indexSpan.Key, indexSpan.EndKey, 0); err != nil {
return err
} else if l := 0; len(kvs) != l {
return errors.Errorf("expected %d key value pairs, but got %d", l, len(kvs))
}
return nil
})

sqlDB.QueryRow(t, "SELECT config FROM system.zones WHERE id = $1", tableDesc.ID).Scan(&buf)
if err := protoutil.Unmarshal(buf, cfg); err != nil {
t.Fatal(err)
}
if exists := subzoneExists(cfg, 1, ""); !exists {
t.Fatal("zone config for primary index removed after dropping secondary index")
}
for _, target := range []string{"t.kv@i", "t.kv.p2"} {
if exists := sqlutils.ZoneConfigExists(t, sqlDB, target); exists {
t.Fatalf(`zone config for %s still exists`, target)
for _, target := range subzones[1:] {
if exists := subzoneExists(cfg, target.index, target.partition); exists {
t.Fatalf(`zone config for %v still exists`, target)
}
}
}
Loading

0 comments on commit 6b912fa

Please sign in to comment.