Skip to content

Commit

Permalink
test(bigtable): Retry if already exists (#11150)
Browse files Browse the repository at this point in the history
* test(internal): Add fatal to stop retrying

* feat(bigtable): Retry on exists
  • Loading branch information
bhshkh authored Dec 19, 2024
1 parent d49263b commit 6eeaed2
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 48 deletions.
2 changes: 1 addition & 1 deletion bigtable/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module cloud.google.com/go/bigtable
go 1.21

require (
cloud.google.com/go v0.116.0
cloud.google.com/go v0.117.0
cloud.google.com/go/iam v1.2.2
cloud.google.com/go/longrunning v0.6.2
github.com/google/btree v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions bigtable/go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
cel.dev/expr v0.16.0 h1:yloc84fytn4zmJX2GU3TkXGsaieaV7dQ057Qs4sIG2Y=
cel.dev/expr v0.16.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.116.0 h1:B3fRrSDkLRt5qSHWe40ERJvhvnQwdZiHu0bJOpldweE=
cloud.google.com/go v0.116.0/go.mod h1:cEPSRWPzZEswwdr9BxE6ChEn01dWlTaF05LiC2Xs70U=
cloud.google.com/go v0.117.0 h1:Z5TNFfQxj7WG2FgOGX1ekC5RiXrYgms6QscOm32M/4s=
cloud.google.com/go v0.117.0/go.mod h1:ZbwhVTb1DBGt2Iwb3tNO6SEK4q+cplHZmLWH+DelYYc=
cloud.google.com/go/auth v0.12.1 h1:n2Bj25BUMM0nvE9D2XLTiImanwZhO3DkfWSYS/SAJP4=
cloud.google.com/go/auth v0.12.1/go.mod h1:BFMu+TNpF3DmvfBO9ClqTR/SiqVIm7LukKF9mbendF4=
cloud.google.com/go/auth/oauth2adapt v0.2.6 h1:V6a6XDu2lTwPZWOawrAa9HUK+DB2zfJyTuciBG5hFkU=
Expand Down
154 changes: 109 additions & 45 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestIntegration_UpdateFamilyValueType(t *testing.T) {
t.Cleanup(cleanup)
familyName := "new_family"
// Create a new column family
if err = adminClient.CreateColumnFamily(ctx, tableName, familyName); err != nil {
if err = createColumnFamilyWithRetry(ctx, t, adminClient, tableName, familyName, nil); err != nil {
t.Fatalf("Failed to create column family: %v", err)
}
// the type of the family is not aggregate
Expand Down Expand Up @@ -473,7 +473,7 @@ func TestIntegration_ReadModifyWrite(t *testing.T) {
t.Fatal(err)
}

if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "counter", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}

Expand Down Expand Up @@ -556,7 +556,7 @@ func TestIntegration_ArbitraryTimestamps(t *testing.T) {
defer cleanup()

// Test arbitrary timestamps more thoroughly.
if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "ts", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
const numVersions = 4
Expand Down Expand Up @@ -658,7 +658,7 @@ func TestIntegration_ArbitraryTimestamps(t *testing.T) {
// Delete non-existing cells, no such column family in this row
// Should not delete anything

if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "non-existing"); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "non-existing", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
mut = NewMutation()
Expand Down Expand Up @@ -709,7 +709,7 @@ func TestIntegration_ArbitraryTimestamps(t *testing.T) {
}

// Check DeleteCellsInFamily
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "status"); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "status", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}

Expand Down Expand Up @@ -842,7 +842,7 @@ func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) {
t.Fatal(err)
}

if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "ts", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}

Expand Down Expand Up @@ -900,14 +900,14 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
t.Skip("Skip long running tests in short mode or non-prod environments")
}

columnFamilyName := "export"
if err := adminClient.CreateColumnFamily(ctx, tableName, columnFamilyName); err != nil {
family := "export"
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, family, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}

for i := 0; i < 10; i++ {
mut := NewMutation()
mut.Set(columnFamilyName, "col", 1000, []byte("test"))
mut.Set(family, "col", 1000, []byte("test"))
if err := table.Apply(ctx, fmt.Sprintf("row-%v", i), mut); err != nil {
t.Fatalf("Apply: %v", err)
}
Expand Down Expand Up @@ -981,7 +981,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {
}

ts := uid.NewSpace("ts", &uid.Options{Short: true}).New()
if err := adminClient.CreateColumnFamily(ctx, tableName, ts); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, ts, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}

Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestIntegration_LargeReadsWritesAndScans(t *testing.T) {

// Test bulk mutations
bulk := uid.NewSpace("bulk", &uid.Options{Short: true}).New()
if err := adminClient.CreateColumnFamily(ctx, tableName, bulk); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, tableName, bulk, nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}
bulkData := map[string][]string{
Expand Down Expand Up @@ -1716,13 +1716,13 @@ func TestIntegration_SampleRowKeys(t *testing.T) {
defer cleanup()

presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix())
if err := adminClient.CreatePresplitTable(ctx, presplitTable, []string{"follows"}); err != nil {
if err := createPresplitTableWithRetry(ctx, t, adminClient, presplitTable, []string{"follows"}); err != nil {
t.Fatal(err)
}
defer adminClient.DeleteTable(ctx, presplitTable)

cf := uid.NewSpace("follows", &uid.Options{Short: true}).New()
if err := adminClient.CreateColumnFamily(ctx, presplitTable, cf); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, presplitTable, cf, nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1786,7 +1786,7 @@ func TestIntegration_TableDeletionProtection(t *testing.T) {
DeletionProtection: Protected,
}

if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}

Expand Down Expand Up @@ -1867,7 +1867,7 @@ func TestIntegration_EnableChangeStream(t *testing.T) {
ChangeStreamRetention: changeStreamRetention,
}

if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}

Expand Down Expand Up @@ -1973,7 +1973,7 @@ func TestIntegration_AutomatedBackups(t *testing.T) {
AutomatedBackupConfig: &automatedBackupPolicy,
}

if err := adminClient.CreateTableFromConf(ctx, &tableConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tableConf); err != nil {
t.Fatalf("Create table from config: %v", err)
}
defer deleteTable(ctx, t, adminClient, tableConf.TableID)
Expand Down Expand Up @@ -2153,7 +2153,7 @@ func TestIntegration_Admin(t *testing.T) {
"fam2": MaxVersionsPolicy(2),
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
defer deleteTable(ctx, t, adminClient, tblConf.TableID)
Expand All @@ -2169,7 +2169,7 @@ func TestIntegration_Admin(t *testing.T) {
}

// Populate mytable and drop row ranges
if err = adminClient.CreateColumnFamily(ctx, myTableName, "cf"); err != nil {
if err = createColumnFamilyWithRetry(ctx, t, adminClient, myTableName, "cf", nil); err != nil {
t.Fatalf("Creating column family: %v", err)
}

Expand Down Expand Up @@ -3061,7 +3061,10 @@ func TestIntegration_Autoscaling(t *testing.T) {

serveNodes := 1
t.Logf("setting autoscaling OFF and setting serve nodes to %v", serveNodes)
err = iAdminClient.UpdateCluster(ctx, instanceToCreate, clusterID, int32(serveNodes))
err = retry(t,
func() error {
return iAdminClient.UpdateCluster(ctx, instanceToCreate, clusterID, int32(serveNodes))
}, nil)
if err != nil {
t.Fatalf("UpdateCluster: %v", err)
}
Expand Down Expand Up @@ -3459,7 +3462,10 @@ func TestIntegration_InstanceUpdate(t *testing.T) {

const numNodes = 4
// update cluster nodes
if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil {
if err := retry(t,
func() error {
return iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes))
}, nil); err != nil {
t.Errorf("UpdateCluster: %v", err)
}

Expand Down Expand Up @@ -3534,7 +3540,7 @@ func TestIntegration_AdminCopyBackup(t *testing.T) {
},
}
defer deleteTable(ctx, t, srcAdminClient, tblConf.TableID)
if err := srcAdminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, srcAdminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}

Expand Down Expand Up @@ -3699,7 +3705,7 @@ func TestIntegration_AdminBackup(t *testing.T) {
"fam2": MaxVersionsPolicy(2),
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
Expand Down Expand Up @@ -3895,7 +3901,7 @@ func TestIntegration_AdminAuthorizedView(t *testing.T) {
"fam2": MaxVersionsPolicy(2),
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
Expand Down Expand Up @@ -4014,7 +4020,7 @@ func TestIntegration_DataAuthorizedView(t *testing.T) {
"fam2": MaxVersionsPolicy(2),
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
if err := createTableFromConfWithRetry(ctx, t, adminClient, &tblConf); err != nil {
t.Fatalf("Creating table from TableConf: %v", err)
}
// Delete the table at the end of the test. Schedule ahead of time
Expand Down Expand Up @@ -4166,11 +4172,11 @@ func TestIntegration_DataAuthorizedView(t *testing.T) {

// Test SampleRowKeys
presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix())
if err := adminClient.CreatePresplitTable(ctx, presplitTable, []string{"r0", "r11", "r12", "r2"}); err != nil {
if err := createPresplitTableWithRetry(ctx, t, adminClient, presplitTable, []string{"r0", "r11", "r12", "r2"}); err != nil {
t.Fatal(err)
}
defer adminClient.DeleteTable(ctx, presplitTable)
if err := adminClient.CreateColumnFamily(ctx, presplitTable, "fam1"); err != nil {
if err := createColumnFamilyWithRetry(ctx, t, adminClient, presplitTable, "fam1", nil); err != nil {
t.Fatal(err)
}
defer adminClient.DeleteAuthorizedView(ctx, presplitTable, authorizedView)
Expand Down Expand Up @@ -4313,6 +4319,8 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C

adminClient, err := testEnv.NewAdminClient()
if err != nil {
cancel()
client.Close()
t.Logf("Error creating admin client: %v", err)

return nil, nil, nil, nil, "", nil, err
Expand All @@ -4328,33 +4336,41 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C

if err := createTableWithRetry(ctx, t, adminClient, tableName); err != nil {
cancel()
client.Close()
adminClient.Close()
t.Logf("Error creating table: %v", err)
return nil, nil, nil, nil, "", nil, err
}

err = retryOnUnavailable(ctx, func() error {
return adminClient.CreateColumnFamily(ctx, tableName, "follows")
})
err = createColumnFamilyWithRetry(ctx, t, adminClient, tableName, "follows", map[codes.Code]bool{codes.Unavailable: true})
if err != nil {
if deleteErr := adminClient.DeleteTable(ctx, tableName); deleteErr != nil {
t.Logf("DeleteTable got error %v", deleteErr)
}
cancel()
client.Close()
adminClient.Close()
t.Logf("Error creating column family: %v", err)
return nil, nil, nil, nil, "", nil, err
}

err = retryOnUnavailable(ctx, func() error {
return adminClient.CreateColumnFamilyWithConfig(ctx, tableName, "sum", Family{ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
}})
})
err = createColumnFamilyWithConfigWithRetry(ctx, t, adminClient, tableName, "sum", &Family{ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
}}, map[codes.Code]bool{codes.Unavailable: true})
if err != nil {
if deleteErr := deleteTableWithRetry(ctx, t, adminClient, tableName); deleteErr != nil {
t.Logf("DeleteTable got error %v", deleteErr)
}
cancel()
client.Close()
adminClient.Close()
t.Logf("Error creating aggregate column family: %v", err)
return nil, nil, nil, nil, "", nil, err
}

return testEnv, client, adminClient, client.Open(tableName), tableName, func() {
if err := adminClient.DeleteTable(ctx, tableName); err != nil {
if err := deleteTableWithRetry(ctx, t, adminClient, tableName); err != nil {
t.Errorf("DeleteTable got error %v", err)
}
cancel()
Expand All @@ -4363,31 +4379,79 @@ func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *C
}, nil
}

func deleteTableWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, tableName string) error {
return retry(t, func() error { return adminClient.DeleteTable(ctx, tableName) },
nil)
}

func createPresplitTableWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, tableName string, splitKeys []string) error {
return retry(t, func() error { return adminClient.CreatePresplitTable(ctx, tableName, splitKeys) },
func() error { return adminClient.DeleteTable(ctx, tableName) })
}

func createTableFromConfWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, conf *TableConf) error {
return retry(t, func() error { return adminClient.CreateTableFromConf(ctx, conf) },
func() error { return adminClient.DeleteTable(ctx, conf.TableID) })
}

func createTableWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, tableName string) error {
// Error seen on last create attempt
var err error
return retry(t, func() error { return adminClient.CreateTable(ctx, tableName) },
func() error { return adminClient.DeleteTable(ctx, tableName) })
}

// retry 'f' and runs 'onExists' if 'f' returns AlreadyExists error
// onExists can be nil
func retry(t *testing.T, f func() error, onExists func() error) error {
if f == nil {
return nil
}

// Error seen on last attempt
var lastErr error

testutil.Retry(t, maxCreateAttempts, retryCreateBackoff, func(r *testutil.R) {
createErr := adminClient.CreateTable(ctx, tableName)
err = createErr
currErr := f()
lastErr = currErr

if createErr != nil {
r.Errorf(createErr.Error())
if currErr != nil {
r.Errorf(currErr.Error())

s, ok := status.FromError(lastErr)
if ok && s.Code() == codes.AlreadyExists && onExists != nil {
lastErr = onExists()
}
}
})
return err
return lastErr
}

func createColumnFamilyWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, table, family string, retryableCodes map[codes.Code]bool) error {
return createColumnFamilyWithConfigWithRetry(ctx, t, adminClient, table, family, nil, retryableCodes)
}

func createColumnFamilyWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, table, family string) error {
func createColumnFamilyWithConfigWithRetry(ctx context.Context, t *testing.T, adminClient *AdminClient, table, family string, config *Family, retryableCodes map[codes.Code]bool) error {
// Error seen on last create attempt
var err error

testutil.Retry(t, maxCreateAttempts, retryCreateBackoff, func(r *testutil.R) {
createErr := adminClient.CreateColumnFamily(ctx, table, family)
var createErr error
if config != nil {
createErr = adminClient.CreateColumnFamilyWithConfig(ctx, table, family, *config)
} else {
createErr = adminClient.CreateColumnFamily(ctx, table, family)
}
err = createErr

if createErr != nil {
r.Errorf(createErr.Error())
s, ok := status.FromError(err)
if ok && retryableCodes != nil && !retryableCodes[s.Code()] {
r.Fatalf(createErr.Error())
}
if ok && s.Code() == codes.AlreadyExists {
// delete before retry
err = adminClient.DeleteColumnFamily(ctx, table, family)
}
}
})
return err
Expand Down

0 comments on commit 6eeaed2

Please sign in to comment.