Skip to content

Commit

Permalink
fix(kv): add KV migration to repair DBRP mappings broken by schema ch…
Browse files Browse the repository at this point in the history
…ange (#20168)
  • Loading branch information
danxmoran committed Nov 25, 2020
1 parent efe4256 commit ba4856e
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 0 deletions.
127 changes: 127 additions & 0 deletions kv/migration/all/0013_repair-DBRP-owner-and-bucket-IDs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package all

import (
"context"
"encoding/json"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kv"
)

var Migration0013_RepairDBRPOwnerAndBucketIDs = UpOnlyMigration(
"repair DBRP owner and bucket IDs",
func(ctx context.Context, store kv.SchemaStore) error {
type oldStyleMapping struct {
ID influxdb.ID `json:"id"`
Database string `json:"database"`
RetentionPolicy string `json:"retention_policy"`
Default bool `json:"default"`

// These 2 fields were renamed.
OrganizationID influxdb.ID `json:"organization_id"`
BucketID influxdb.ID `json:"bucket_id"`
}

// Collect DBRPs that are using the old schema.
var mappings []*oldStyleMapping
if err := store.View(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(dbrpBucket)
if err != nil {
return err
}

cursor, err := bkt.ForwardCursor(nil)
if err != nil {
return err
}

return kv.WalkCursor(ctx, cursor, func(_, v []byte) (bool, error) {
var mapping oldStyleMapping
if err := json.Unmarshal(v, &mapping); err != nil {
return false, err
}

// DBRPs that are already stored in the new schema will end up with
// invalid (zero) values for the 2 ID fields when unmarshalled using
// the old JSON schema.
if mapping.OrganizationID.Valid() && mapping.BucketID.Valid() {
mappings = append(mappings, &mapping)
}

return true, nil
})
}); err != nil {
return err
}

type newStyleDbrpMapping struct {
ID influxdb.ID `json:"id"`
Database string `json:"database"`
RetentionPolicy string `json:"retention_policy"`
Default bool `json:"default"`

// New names for the 2 renamed fields.
OrganizationID influxdb.ID `json:"orgID"`
BucketID influxdb.ID `json:"bucketID"`
}
batchSize := 100
writeBatch := func(batch []*oldStyleMapping) (err error) {
ids := make([][]byte, len(batch))
for i, mapping := range batch {
ids[i], err = mapping.ID.Encode()
if err != nil {
return
}
}

return store.Update(ctx, func(tx kv.Tx) error {
bkt, err := tx.Bucket(dbrpBucket)
if err != nil {
return err
}

values, err := bkt.GetBatch(ids...)
if err != nil {
return err
}

for i, value := range values {
var mapping newStyleDbrpMapping
if err := json.Unmarshal(value, &mapping); err != nil {
return err
}

if !mapping.OrganizationID.Valid() {
mapping.OrganizationID = batch[i].OrganizationID
}
if !mapping.BucketID.Valid() {
mapping.BucketID = batch[i].BucketID
}

updated, err := json.Marshal(mapping)
if err != nil {
return err
}

if err := bkt.Put(ids[i], updated); err != nil {
return err
}
}

return nil
})
}

for i := 0; i < len(mappings); i += batchSize {
end := i + batchSize
if end > len(mappings) {
end = len(mappings)
}
if err := writeBatch(mappings[i:end]); err != nil {
return err
}
}

return nil
},
)
8 changes: 8 additions & 0 deletions kv/migration/all/0014_reindex-DBRPs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package all

import (
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/kv"
)

var Migration0014_ReindexDBRPs = kv.NewIndexMigration(dbrp.ByOrgIDIndexMapping)
4 changes: 4 additions & 0 deletions kv/migration/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ var Migrations = [...]migration.Spec{
Migration0011_PopulateDashboardsOwnerId,
// Populate the DBRP service ByOrg index
Migration0012_DBRPByOrgIndex,
// repair DBRP owner and bucket IDs
Migration0013_RepairDBRPOwnerAndBucketIDs,
// reindex DBRPs
Migration0014_ReindexDBRPs,
// {{ do_not_edit . }}
}

0 comments on commit ba4856e

Please sign in to comment.