diff --git a/.github/docker-compose.yml b/.github/docker-compose.yml index 63bbbbc8c..150f2eeda 100644 --- a/.github/docker-compose.yml +++ b/.github/docker-compose.yml @@ -20,6 +20,10 @@ services: image: cockroachdb/cockroach:latest-v21.2 network_mode: host command: start-single-node --insecure + cockroachdb-v22.1: + image: cockroachdb/cockroach:latest-v22.1 + network_mode: host + command: start-single-node --insecure mysql-v8: image: mysql:8-debian platform: linux/x86_64 diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 740720076..83856ab03 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -65,7 +65,7 @@ jobs: strategy: fail-fast: false matrix: - cockroachdb: [ v20.2, v21.1, v21.2 ] + cockroachdb: [ v20.2, v21.1, v21.2, v22.1 ] # This matrix component should use the target names listed # in the docker-compose.yml file in the parent directory. integration: diff --git a/internal/target/schemawatch/coldata.go b/internal/target/schemawatch/coldata.go index 918cea99d..599f7ceae 100644 --- a/internal/target/schemawatch/coldata.go +++ b/internal/target/schemawatch/coldata.go @@ -37,8 +37,6 @@ func colSliceEqual(a, b []types.ColData) bool { // // Parts of the CTE: // * pk_name: finds the name of the primary key constraint for the table -// or returns the CRDB-default "primary" value in cases where no -// explicit PK. // * pks: extracts the names of the PK columns and their relative // positions. We exclude any "storing" columns to account for rowid // value. @@ -51,9 +49,7 @@ const sqlColumnsQuery = ` WITH pk_name AS ( SELECT constraint_name FROM [SHOW CONSTRAINTS FROM %[1]s] - WHERE constraint_type = 'PRIMARY KEY' - UNION ALL SELECT 'primary' - LIMIT 1), + WHERE constraint_type = 'PRIMARY KEY'), pks AS ( SELECT column_name, seq_in_index FROM [SHOW INDEX FROM %[1]s] JOIN pk_name ON (index_name = constraint_name) @@ -90,6 +86,7 @@ func getColumns( // Clear from previous loop. columns = columns[:0] + foundPrimay := false for rows.Next() { var column types.ColData var name string @@ -97,8 +94,33 @@ func getColumns( return err } column.Name = ident.New(name) + if column.Primary { + foundPrimay = true + } columns = append(columns, column) } + + // If there are no primary key columns, we know that a synthetic + // rowid column will exist. We'll prepend it to the slice and + // then delete the + if !foundPrimay { + rowID := types.ColData{ + Ignored: false, + Name: ident.New("rowid"), + Primary: true, + Type: "INT8", + } + // Filter and prepend. + curIdx := 0 + for _, col := range columns { + if col.Name != rowID.Name { + columns[curIdx] = col + curIdx++ + } + } + columns = append([]types.ColData{rowID}, columns[:curIdx]...) + } + return nil }) return columns, err diff --git a/internal/target/sinktest/sinktest.go b/internal/target/sinktest/sinktest.go index 403efdd58..ce817bcc6 100644 --- a/internal/target/sinktest/sinktest.go +++ b/internal/target/sinktest/sinktest.go @@ -17,6 +17,7 @@ import ( "fmt" "math/rand" "os" + "strings" "sync" "time" @@ -28,9 +29,7 @@ import ( ) var connString = flag.String("testConnect", - "postgresql://root@localhost:26257/defaultdb"+ - "?sslmode=disable"+ - "&experimental_enable_hash_sharded_indexes=true", + "postgresql://root@localhost:26257/defaultdb?sslmode=disable", "the connection string to use for testing") var globalDBInfo struct { @@ -45,6 +44,7 @@ func bootstrap(ctx context.Context) (*DBInfo, error) { if globalDBInfo.DBInfo != nil { return globalDBInfo.DBInfo, nil } + globalDBInfo.DBInfo = &DBInfo{} if !flag.Parsed() { flag.Parse() @@ -60,7 +60,27 @@ func bootstrap(ctx context.Context) (*DBInfo, error) { if err != nil { return nil, errors.Wrap(err, "could not open database connection") } - globalDBInfo.DBInfo = &DBInfo{db: pool} + + if err := retry.Retry(ctx, func(ctx context.Context) error { + return pool.QueryRow(ctx, "SELECT version()").Scan(&globalDBInfo.version) + }); err != nil { + return nil, errors.Wrap(err, "could not determine cluster version") + } + + // Reset the pool to one that enables the hash-sharded feature in + // older versions of CockroachDB. + if !strings.Contains(globalDBInfo.version, "v22.") { + cfg := pool.Config().Copy() + cfg.ConnConfig.RuntimeParams["experimental_enable_hash_sharded_indexes"] = "true" + + pool.Close() + pool, err = pgxpool.ConnectConfig(ctx, cfg) + if err != nil { + return nil, errors.Wrap(err, "could not re-open pool") + } + } + + globalDBInfo.db = pool if lic, ok := os.LookupEnv("COCKROACH_DEV_LICENSE"); ok { if err := retry.Execute(ctx, pool, @@ -81,12 +101,6 @@ func bootstrap(ctx context.Context) (*DBInfo, error) { return nil, errors.Wrap(err, "could not enable rangefeeds") } - if err := retry.Retry(ctx, func(ctx context.Context) error { - return pool.QueryRow(ctx, "SELECT version()").Scan(&globalDBInfo.version) - }); err != nil { - return nil, errors.Wrap(err, "could not determine cluster version") - } - return globalDBInfo.DBInfo, nil }