Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(upgrade): don't drop shard-group durations when upgrading DBs #22650

Merged
merged 3 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/influxd/upgrade/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func upgradeDatabases(ctx context.Context, cli clients.CLI, v1 *influxDBv1, v2 *
Description: fmt.Sprintf("Upgraded from v1 database %s with retention policy %s", db.Name, rp.Name),
RetentionPolicyName: rp.Name,
RetentionPeriod: rp.Duration,
ShardGroupDuration: rp.ShardGroupDuration,
}
log.Debug("Creating bucket", zap.String("Bucket", bucket.Name))
err = v2.bucketSvc.CreateBucket(ctx, bucket)
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestUpgradeRealDB(t *testing.T) {
case bucketNames[6]:
emptyBucketId = b.ID.String()
}
require.NotZero(t, b.ShardGroupDuration)
}
require.NoDirExists(t, filepath.Join(enginePath, "data", "_internal"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,110 +10,112 @@ import (
"github.com/influxdata/influxdb/v2/v1/services/meta"
)

var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration(
"record shard group durations in bucket metadata",
func(ctx context.Context, store kv.SchemaStore) error {
type bucket struct {
ID platform.ID `json:"id,omitempty"`
OrgID platform.ID `json:"orgID,omitempty"`
Type int `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionPeriod time.Duration `json:"retentionPeriod"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`

// This is expected to be 0 for all buckets created before
// we began tracking it in metadata.
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
func repairMissingShardGroupDurations(ctx context.Context, store kv.SchemaStore) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The repair logic in the new migration is identical to the logic in the migration I added to initialize shard-group duration values in v2.0.5. I extracted the logic into a function so it could be reused.

type bucket struct {
ID platform.ID `json:"id,omitempty"`
OrgID platform.ID `json:"orgID,omitempty"`
Type int `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
RetentionPolicyName string `json:"rp,omitempty"` // This to support v1 sources
RetentionPeriod time.Duration `json:"retentionPeriod"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`

// This is expected to be 0 for all buckets created before
// we began tracking it in metadata.
ShardGroupDuration time.Duration `json:"shardGroupDuration"`
}
bucketBucket := []byte("bucketsv1")

// Collect buckets that need to be updated
var buckets []*bucket
if err := store.View(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
if err != nil {
return err
}

cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
}

return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
var b bucket
if err := json.Unmarshal(v, &b); err != nil {
return false, err
}
if b.ShardGroupDuration == 0 {
buckets = append(buckets, &b)
}

return true, nil
})
}); err != nil {
return err
}

batchSize := 100
writeBatch := func(batch []*bucket) (err error) {
ids := make([][]byte, len(batch))
for i, b := range batch {
ids[i], err = b.ID.Encode()
if err != nil {
return err
}
}
bucketBucket := []byte("bucketsv1")

// Collect buckets that need to be updated
var buckets []*bucket
if err := store.View(ctx, func(tx kv.Tx) error {
return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
if err != nil {
return err
}

cursor, err := bkt.ForwardCursor(nil)
values, err := bkt.GetBatch(ids...)
if err != nil {
return err
}

return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
for i, value := range values {
var b bucket
if err := json.Unmarshal(v, &b); err != nil {
return false, err
}
if b.ShardGroupDuration == 0 {
buckets = append(buckets, &b)
if err := json.Unmarshal(value, &b); err != nil {
return err
}

return true, nil
})
}); err != nil {
return err
}

batchSize := 100
writeBatch := func(batch []*bucket) (err error) {
ids := make([][]byte, len(batch))
for i, b := range batch {
ids[i], err = b.ID.Encode()
if err != nil {
return err
if b.ShardGroupDuration == 0 {
// Backfill the duration using the same method used
// to derive the value within the storage engine.
b.ShardGroupDuration = meta.NormalisedShardDuration(0, b.RetentionPeriod)
}
}

return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(bucketBucket)
updated, err := json.Marshal(b)
if err != nil {
return err
}

values, err := bkt.GetBatch(ids...)
if err != nil {
if err := bkt.Put(ids[i], updated); err != nil {
return err
}
}

for i, value := range values {
var b bucket
if err := json.Unmarshal(value, &b); err != nil {
return err
}

if b.ShardGroupDuration == 0 {
// Backfill the duration using the same method used
// to dervie the value within the storage engine.
b.ShardGroupDuration = meta.NormalisedShardDuration(0, b.RetentionPeriod)
}

updated, err := json.Marshal(b)
if err != nil {
return err
}
if err := bkt.Put(ids[i], updated); err != nil {
return err
}
}
return nil
})
}

return nil
})
for i := 0; i < len(buckets); i += batchSize {
end := i + batchSize
if end > len(buckets) {
end = len(buckets)
}

for i := 0; i < len(buckets); i += batchSize {
end := i + batchSize
if end > len(buckets) {
end = len(buckets)
}
if err := writeBatch(buckets[i:end]); err != nil {
return err
}
if err := writeBatch(buckets[i:end]); err != nil {
return err
}
}

return nil
}

return nil
},
var Migration0015_RecordShardGroupDurationsInBucketMetadata = UpOnlyMigration(
"record shard group durations in bucket metadata",
repairMissingShardGroupDurations,
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
)

func TestMigration_ShardGroupDuration(t *testing.T) {
testRepairMissingShardGroupDurations(t, 15)
}

func testRepairMissingShardGroupDurations(t *testing.T, migrationNum int) {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

// Run up to migration 14.
ts := newService(t, ctx, 14)
// Run up to the migration before the migration-under-test.
ts := newService(t, ctx, migrationNum-2)

// Seed some buckets.
buckets := []*influxdb.Bucket{
Expand Down Expand Up @@ -65,8 +69,8 @@ func TestMigration_ShardGroupDuration(t *testing.T) {
})
require.NoError(t, err)

// Run the migration.
require.NoError(t, Migration0015_RecordShardGroupDurationsInBucketMetadata.Up(context.Background(), ts.Store))
// Run the migration-under-test.
require.NoError(t, Migrations[migrationNum-1].Up(context.Background(), ts.Store))

// Read the buckets back out of the store.
migratedBuckets := make([]influxdb.Bucket, len(buckets))
Expand Down
6 changes: 6 additions & 0 deletions kv/migration/all/0018_repair-missing-shard-group-durations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package all

var Migration0018_RepairMissingShardGroupDurations = UpOnlyMigration(
"repair missing shard group durations",
repairMissingShardGroupDurations,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package all

import "testing"

func TestMigration_PostUpgradeShardGroupDuration(t *testing.T) {
testRepairMissingShardGroupDurations(t, 18)
}
2 changes: 2 additions & 0 deletions kv/migration/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,7 @@ var Migrations = [...]migration.Spec{
Migration0016_AddAnnotationsNotebooksToOperToken,
// add annotations and notebooks resource types to all-access tokens
Migration0017_AddAnnotationsNotebooksToAllAccessTokens,
// repair missing shard group durations
Migration0018_RepairMissingShardGroupDurations,
// {{ do_not_edit . }}
}