Skip to content

Commit

Permalink
feat(spanner): add integration tests for Bit Reversed Sequences (#7924)
Browse files Browse the repository at this point in the history
* feat(spanner): add integration tests for Bit Reversed Sequences

* fix tests

* fix typo

---------

Co-authored-by: Sri Harsha CH <[email protected]>
  • Loading branch information
rahul2393 and harshachinta authored Aug 7, 2023
1 parent f2af2a9 commit 9b6e7c6
Showing 1 changed file with 290 additions and 18 deletions.
308 changes: 290 additions & 18 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"os/exec"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -57,12 +58,13 @@ const (
directPathIPV6Prefix = "[2001:4860:8040"
directPathIPV4Prefix = "34.126"

singerDDLStatements = "SINGER_DDL_STATEMENTS"
simpleDDLStatements = "SIMPLE_DDL_STATEMENTS"
readDDLStatements = "READ_DDL_STATEMENTS"
backupDDLStatements = "BACKUP_DDL_STATEMENTS"
testTableDDLStatements = "TEST_TABLE_DDL_STATEMENTS"
fkdcDDLStatements = "FKDC_DDL_STATEMENTS"
singerDDLStatements = "SINGER_DDL_STATEMENTS"
simpleDDLStatements = "SIMPLE_DDL_STATEMENTS"
readDDLStatements = "READ_DDL_STATEMENTS"
backupDDLStatements = "BACKUP_DDL_STATEMENTS"
testTableDDLStatements = "TEST_TABLE_DDL_STATEMENTS"
fkdcDDLStatements = "FKDC_DDL_STATEMENTS"
testTableBitReversedSeqStatements = "TEST_TABLE_BIT_REVERSED_SEQUENCE_STATEMENTS"
)

var (
Expand Down Expand Up @@ -288,22 +290,51 @@ var (
PRIMARY KEY (CartId))`,
}

bitReverseSeqDBStatments = []string{
`CREATE SEQUENCE seqT OPTIONS (sequence_kind = "bit_reversed_positive")`,
`CREATE TABLE T (
id INT64 DEFAULT (GET_NEXT_SEQUENCE_VALUE(Sequence seqT)),
value INT64,
counter INT64 DEFAULT (GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)),
br_id INT64 AS (BIT_REVERSE(id, true)) STORED,
CONSTRAINT id_gt_0 CHECK (id > 0),
CONSTRAINT counter_gt_br_id CHECK (counter >= br_id),
CONSTRAINT br_id_true CHECK (id = BIT_REVERSE(br_id, true)),
) PRIMARY KEY (id)`,
}

bitReverseSeqDBPGStatments = []string{
`CREATE SEQUENCE seqT BIT_REVERSED_POSITIVE`,
`CREATE TABLE T (
id BIGINT DEFAULT nextval('seqT'),
value BIGINT,
counter BIGINT DEFAULT spanner.get_internal_sequence_state('seqT'),
br_id bigint GENERATED ALWAYS AS (spanner.bit_reverse(id, true)) STORED,
CONSTRAINT id_gt_0 CHECK (id > 0),
CONSTRAINT counter_gt_br_id CHECK (counter >= br_id),
CONSTRAINT br_id_true CHECK (id = spanner.bit_reverse(br_id, true)),
PRIMARY KEY (id)
)`,
}

statements = map[adminpb.DatabaseDialect]map[string][]string{
adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL: {
singerDDLStatements: singerDBStatements,
simpleDDLStatements: simpleDBStatements,
readDDLStatements: readDBStatements,
backupDDLStatements: backupDBStatements,
testTableDDLStatements: readDBStatements,
fkdcDDLStatements: fkdcDBStatements,
singerDDLStatements: singerDBStatements,
simpleDDLStatements: simpleDBStatements,
readDDLStatements: readDBStatements,
backupDDLStatements: backupDBStatements,
testTableDDLStatements: readDBStatements,
fkdcDDLStatements: fkdcDBStatements,
testTableBitReversedSeqStatements: bitReverseSeqDBStatments,
},
adminpb.DatabaseDialect_POSTGRESQL: {
singerDDLStatements: singerDBPGStatements,
simpleDDLStatements: simpleDBPGStatements,
readDDLStatements: readDBPGStatements,
backupDDLStatements: backupDBPGStatements,
testTableDDLStatements: readDBPGStatements,
fkdcDDLStatements: fkdcDBPGStatements,
singerDDLStatements: singerDBPGStatements,
simpleDDLStatements: simpleDBPGStatements,
readDDLStatements: readDBPGStatements,
backupDDLStatements: backupDBPGStatements,
testTableDDLStatements: readDBPGStatements,
fkdcDDLStatements: fkdcDBPGStatements,
testTableBitReversedSeqStatements: bitReverseSeqDBPGStatments,
},
}

Expand Down Expand Up @@ -4632,6 +4663,219 @@ func TestIntegration_GFE_Latency(t *testing.T) {
DisableGfeLatencyAndHeaderMissingCountViews()
}

func TestIntegration_Bit_Reversed_Sequence(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// create table with bit reverse seq options
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableBitReversedSeqStatements])
defer cleanup()

returningSQL := `THEN RETURN`
internalStateSQL := `GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
returningSQL = `RETURNING`
internalStateSQL = `spanner.get_internal_sequence_state('seqT')`
}

var tests = []struct {
name string
test func() error
wantErr error
}{
{
name: "success: inserted rows should have auto generated keys",
test: func() error {
var (
values [][]interface{}
err error
)
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES (100), (200), (300) "+returningSQL+" id, counter"))
values, err = readAllBitReversedSeqTable(iter, true)
return err
})
if len(values) != 3 {
return errors.New("expected 3 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 3; i++ {
newID, newCounter := values[i][0].(int64), values[i][1].(int64)
if newID <= 0 {
return errors.New("expected id1, id2, id3 > 0")

}
if newCounter < counter {
return errors.New("expected c3 >= c2 >= c1")
}
counter = newCounter
}
iter := client.Single().Query(ctx, NewStatement("SELECT "+internalStateSQL))
r, err := iter.Next()
if err != nil {
return err
}
var c3 int64
err = r.Columns(&c3)
if err != nil {
return err
}
if c3 > counter {
return errors.New("expected c3 <= SELECT GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)")
}
return err
},
},
{
name: "success: reduce ranges to half",
test: func() error {
ddl := `ALTER SEQUENCE seqT SET OPTIONS (
skip_range_min = 0,
skip_range_max = 4611686018427387904
)`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
ddl = `ALTER SEQUENCE seqT SKIP RANGE 0 4611686018427387904`
}
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{ddl},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
var values [][]interface{}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
var v string
for i := 1; i <= 100; i++ {
v = v + " (" + strconv.Itoa(i) + ")"
if i != 100 {
v = v + ", "
}
}
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES "+v+" "+returningSQL+" id, counter"))
values, err = readAllBitReversedSeqTable(iter, true)
return err
})
if err != nil {
return err
}
if len(values) != 100 {
return errors.New("expected 100 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 100; i++ {
newID, newCounter := values[i][0].(int64), values[i][1].(int64)
if newID <= 0 || newID < 4611686018427387904 {
return errors.New("expected d1, id2, id3, …., id100 > 4611686018427387904")

}
if newCounter < counter {
return errors.New("expected c3 >= c2 >= c1")
}
counter = newCounter
}
return err
},
},
{
name: "success: move start with counter forward",
test: func() error {
ddl := `ALTER SEQUENCE seqT SET OPTIONS (start_with_counter=10001)`
if testDialect == adminpb.DatabaseDialect_POSTGRESQL {
ddl = `ALTER SEQUENCE seqT RESTART COUNTER WITH 10001`
}
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{ddl},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
var values [][]interface{}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES (4), (5), (6) "+returningSQL+" id, br_id, counter"))
values, err = readAllBitReversedSeqTable(iter, false)
return err
})
if err != nil {
return err
}
if len(values) != 3 {
return errors.New("expected 3 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 3; i++ {
newID, newBrID, newCounter := values[i][0].(int64), values[i][1].(int64), values[i][2].(int64)
if newID <= 0 {
return errors.New("expected id > 0")
}
if newBrID < 10001 {
return errors.New("expected br_idi >= 10001")
}
if newCounter < 10001 {
return errors.New("expected ci >= 10001")
}
if counter < newCounter {
counter = newCounter
}
}
iter := client.Single().Query(ctx, NewStatement("SELECT "+internalStateSQL))
r, err := iter.Next()
if err != nil {
return err
}
var c3 int64
err = r.Columns(&c3)
if err != nil {
return err
}
if c3 > counter {
return errors.New("expected max(ci) <= SELECT GET_INTERNAL_SEQUENCE_STATE(Sequence seqT)")
}
return err
},
},
{
name: "success: drop the sequences",
test: func() error {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{
"ALTER TABLE T ALTER COLUMN id DROP DEFAULT",
"ALTER TABLE T ALTER COLUMN counter DROP DEFAULT",
"ALTER TABLE T DROP CONSTRAINT id_gt_0",
"ALTER TABLE T DROP CONSTRAINT counter_gt_br_id",
"ALTER TABLE T DROP CONSTRAINT br_id_true",
"DROP SEQUENCE seqT",
},
})
if err != nil {
return err
}
if err := op.Wait(ctx); err != nil {
return err
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotErr := tt.test()
if gotErr != nil && strings.ToLower(gotErr.Error()) != strings.ToLower(tt.wantErr.Error()) {
t.Errorf("BIT REVERSED SEQUENECES error=%v, wantErr: %v", gotErr, tt.wantErr)
}
})
}
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
return prepareDBAndClient(ctx, t, spc, statements, testDialect)
Expand Down Expand Up @@ -4976,6 +5220,34 @@ func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) {
}
}

func readAllBitReversedSeqTable(iter *RowIterator, onlyIDCounter bool) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
for {
row, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
var id, brID, counter int64
if onlyIDCounter {
err = row.Columns(&id, &counter)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, counter})
continue
}
err = row.Columns(&id, &brID, &counter)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, brID, counter})
}
}

func maxDuration(a, b time.Duration) time.Duration {
if a > b {
return a
Expand Down

0 comments on commit 9b6e7c6

Please sign in to comment.