Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add integration tests for Bit Reversed Sequences #7924

Merged
merged 6 commits into from
Aug 7, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 275 additions & 18 deletions spanner/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"os/exec"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -57,12 +58,13 @@ const (
directPathIPV6Prefix = "[2001:4860:8040"
directPathIPV4Prefix = "34.126"

singerDDLStatements = "SINGER_DDL_STATEMENTS"
simpleDDLStatements = "SIMPLE_DDL_STATEMENTS"
readDDLStatements = "READ_DDL_STATEMENTS"
backupDDLStatements = "BACKUP_DDL_STATEMENTS"
testTableDDLStatements = "TEST_TABLE_DDL_STATEMENTS"
fkdcDDLStatements = "FKDC_DDL_STATEMENTS"
singerDDLStatements = "SINGER_DDL_STATEMENTS"
simpleDDLStatements = "SIMPLE_DDL_STATEMENTS"
readDDLStatements = "READ_DDL_STATEMENTS"
backupDDLStatements = "BACKUP_DDL_STATEMENTS"
testTableDDLStatements = "TEST_TABLE_DDL_STATEMENTS"
fkdcDDLStatements = "FKDC_DDL_STATEMENTS"
testTableBitReversedSeqStatements = "TEST_TABLE_BIT_REVERSED_SEQUENCE_STATEMENTS"
)

var (
Expand Down Expand Up @@ -288,22 +290,51 @@ var (
PRIMARY KEY (CartId))`,
}

bitReverseSeqDBStatments = []string{
`CREATE SEQUENCE seqT OPTIONS (sequence_kind = "bit_reversed_positive")`,
`CREATE TABLE T (
id INT64 DEFAULT (GET_NEXT_SEQUENCE_VALUE('seqT')),
rahul2393 marked this conversation as resolved.
Show resolved Hide resolved
value INT64,
counter INT64 DEFAULT (GET_INTERNAL_SEQUENCE_STATE('seqT')),
rahul2393 marked this conversation as resolved.
Show resolved Hide resolved
br_id INT64 AS (BIT_REVERSE(id, true)) STORED,
CONSTRAINT id_gt_0 CHECK (id > 0),
CONSTRAINT counter_gt_br_id CHECK (counter >= br_id),
CONSTRAINT br_id_true CHECK (id = BIT_REVERSE(br_id, true)),
) PRIMARY KEY (id)`,
}

bitReverseSeqDBPGStatments = []string{
`CREATE SEQUENCE seqT BIT_REVERSED_POSITIVE`,
`CREATE TABLE T (
id BIGINT DEFAULT nextval('seqT'),
value BIGINT,
counter BIGINT DEFAULT spanner.get_internal_sequence_state('seqT'),
br_id bigint GENERATED ALWAYS AS (spanner.bit_reverse(id, true)) STORED,
CHECK (id > 0),
CHECK (counter >= br_id),
CHECK (id = spanner.bit_reverse(br_id, true)),
PRIMARY KEY (id)
)`,
}

statements = map[adminpb.DatabaseDialect]map[string][]string{
adminpb.DatabaseDialect_GOOGLE_STANDARD_SQL: {
singerDDLStatements: singerDBStatements,
simpleDDLStatements: simpleDBStatements,
readDDLStatements: readDBStatements,
backupDDLStatements: backupDBStatements,
testTableDDLStatements: readDBStatements,
fkdcDDLStatements: fkdcDBStatements,
singerDDLStatements: singerDBStatements,
simpleDDLStatements: simpleDBStatements,
readDDLStatements: readDBStatements,
backupDDLStatements: backupDBStatements,
testTableDDLStatements: readDBStatements,
fkdcDDLStatements: fkdcDBStatements,
testTableBitReversedSeqStatements: bitReverseSeqDBStatments,
},
adminpb.DatabaseDialect_POSTGRESQL: {
singerDDLStatements: singerDBPGStatements,
simpleDDLStatements: simpleDBPGStatements,
readDDLStatements: readDBPGStatements,
backupDDLStatements: backupDBPGStatements,
testTableDDLStatements: readDBPGStatements,
fkdcDDLStatements: fkdcDBPGStatements,
singerDDLStatements: singerDBPGStatements,
simpleDDLStatements: simpleDBPGStatements,
readDDLStatements: readDBPGStatements,
backupDDLStatements: backupDBPGStatements,
testTableDDLStatements: readDBPGStatements,
fkdcDDLStatements: fkdcDBPGStatements,
testTableBitReversedSeqStatements: bitReverseSeqDBPGStatments,
},
}

Expand Down Expand Up @@ -4632,6 +4663,204 @@ func TestIntegration_GFE_Latency(t *testing.T) {
DisableGfeLatencyAndHeaderMissingCountViews()
}

func TestIntegration_Bit_Reversed_Sequence(t *testing.T) {
skipEmulatorTest(t)
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// create table with bit reverse seq options
client, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableBitReversedSeqStatements])
defer cleanup()

var tests = []struct {
name string
test func() error
wantErr error
}{
{
name: "success: inserted rows should have auto generated keys",
test: func() error {
var (
values [][]interface{}
err error
)
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES (100), (200), (300) THEN RETURN id, counter"))
values, err = readAllBitReversedSeqTable(iter, true)
return err
})
if len(values) != 3 {
return errors.New("expected 3 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 3; i++ {
newID, newCounter := values[i][0].(int64), values[i][1].(int64)
if newID <= 0 {
return errors.New("expected id1, id2, id3 > 0")

}
if newCounter < counter {
return errors.New("expected c3 >= c2 >= c1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we tell here that c1, c2, c3 are different states of counters. Currently it is bit hard to understand directly.

}
counter = newCounter
}
iter := client.Single().Query(ctx, NewStatement("SELECT GET_INTERNAL_SEQUENCE_STATE('seqT')"))
rahul2393 marked this conversation as resolved.
Show resolved Hide resolved
r, err := iter.Next()
if err != nil {
return err
}
var c3 int64
err = r.Columns(&c3)
if err != nil {
return err
}
if c3 > counter {
return errors.New("expected c3 <= SELECT GET_INTERNAL_SEQUENCE_STATE('seqT')")
}
return err
},
},
{
name: "success: reduce ranges to half",
test: func() error {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{`ALTER SEQUENCE seqT SET OPTIONS (
skip_range_min = 0,
skip_range_max = 4611686018427387904
)`},
})
if err != nil {
t.Fatalf("cannot modify sequence %v: %v", dbPath, err)
}
if err := op.Wait(ctx); err != nil {
t.Fatalf("timeout modifying sequence %v: %v", dbPath, err)
}
var values [][]interface{}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
var v string
for i := 1; i <= 100; i++ {
v = v + " (" + strconv.Itoa(i) + ")"
if i != 100 {
v = v + ", "
}
}
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES "+v+" THEN RETURN id, counter"))
values, err = readAllBitReversedSeqTable(iter, true)
return err
})
if err != nil {
return err
}
if len(values) != 100 {
return errors.New("expected 100 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 100; i++ {
newID, newCounter := values[i][0].(int64), values[i][1].(int64)
if newID <= 0 || newID < 4611686018427387904 {
return errors.New("expected d1, id2, id3, …., id100 > 4611686018427387904")

}
if newCounter < counter {
return errors.New("expected c3 >= c2 >= c1")
}
counter = newCounter
}
return err
},
},
{
name: "success: move start with counter forward",
test: func() error {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{`ALTER SEQUENCE seqT SET OPTIONS (start_with_counter=10001)`},
})
if err != nil {
t.Fatalf("cannot modify sequence %v: %v", dbPath, err)
}
if err := op.Wait(ctx); err != nil {
t.Fatalf("timeout modifying sequence %v: %v", dbPath, err)
}
var values [][]interface{}
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.Query(ctx, NewStatement("INSERT INTO T (value) VALUES (4), (5), (6) THEN RETURN id, br_id, counter"))
values, err = readAllBitReversedSeqTable(iter, false)
return err
})
if err != nil {
return err
}
if len(values) != 3 {
return errors.New("expected 3 rows to be inserted")
}
counter := int64(0)
for i := 0; i < 3; i++ {
newID, newBrID, newCounter := values[i][0].(int64), values[i][1].(int64), values[i][2].(int64)
if newID <= 0 {
return errors.New("expected id > 0")
}
if newBrID < 10001 {
return errors.New("expected br_idi >= 10001")
}
if newCounter < 10001 {
return errors.New("expected ci >= 10001")
}
if counter < newCounter {
counter = newCounter
}
}
iter := client.Single().Query(ctx, NewStatement("SELECT GET_INTERNAL_SEQUENCE_STATE('seqT')"))
r, err := iter.Next()
if err != nil {
return err
}
var c3 int64
err = r.Columns(&c3)
if err != nil {
return err
}
if c3 > counter {
return errors.New("expected max(ci) <= SELECT GET_INTERNAL_SEQUENCE_STATE('seqT')")
}
return err
},
},
{
name: "success: drop the sequences",
test: func() error {
op, err := databaseAdmin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbPath,
Statements: []string{
"ALTER TABLE T ALTER COLUMN id DROP DEFAULT",
"ALTER TABLE T ALTER COLUMN counter DROP DEFAULT",
"ALTER TABLE T DROP CONSTRAINT id_gt_0",
"ALTER TABLE T DROP CONSTRAINT counter_gt_br_id",
"ALTER TABLE T DROP CONSTRAINT br_id_true",
"DROP SEQUENCE seqT",
},
})
if err != nil {
t.Fatalf("cannot delete sequences %v: %v", dbPath, err)
}
if err := op.Wait(ctx); err != nil {
t.Fatalf("timeout deleting sequence %v: %v", dbPath, err)
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotErr := tt.test()
if gotErr != nil && strings.ToLower(gotErr.Error()) != strings.ToLower(tt.wantErr.Error()) {
t.Errorf("BIT REVERSED SEQUENECES error=%v, wantErr: %v", gotErr, tt.wantErr)
}
})
}
}

// Prepare initializes Cloud Spanner testing DB and clients.
func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) {
return prepareDBAndClient(ctx, t, spc, statements, testDialect)
Expand Down Expand Up @@ -4976,6 +5205,34 @@ func readAllAccountsTable(iter *RowIterator) ([][]interface{}, error) {
}
}

func readAllBitReversedSeqTable(iter *RowIterator, onlyIDCounter bool) ([][]interface{}, error) {
defer iter.Stop()
var vals [][]interface{}
for {
row, err := iter.Next()
if err == iterator.Done {
return vals, nil
}
if err != nil {
return nil, err
}
var id, brID, counter int64
if onlyIDCounter {
err = row.Columns(&id, &counter)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, counter})
continue
}
err = row.Columns(&id, &brID, &counter)
if err != nil {
return nil, err
}
vals = append(vals, []interface{}{id, brID, counter})
}
}

func maxDuration(a, b time.Duration) time.Duration {
if a > b {
return a
Expand Down