Skip to content

Commit

Permalink
fix(cmd/influxd): improve influxd upgrade logging, and require manu…
Browse files Browse the repository at this point in the history
…al confirmation of data copy (#20440)
  • Loading branch information
danxmoran authored Jan 7, 2021
1 parent 52692ba commit 30306e5
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 106 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Replacement `tsi1` indexes will be automatically generated on startup for shards
1. [20313](https://github.com/influxdata/influxdb/pull/20313): Automatically build `tsi1` indexes for shards that need it instead of falling back to `inmem`.
1. [20313](https://github.com/influxdata/influxdb/pull/20313): Fix logging initialization for storage engine.
1. [20442](https://github.com/influxdata/influxdb/pull/20442): Don't return 500 codes for partial write failures.
1. [20440](https://github.com/influxdata/influxdb/pull/20440): Add confirmation step w/ file sizes before copying data files in `influxd upgrade`.

## v2.0.3 [2020-12-14]

Expand Down
145 changes: 79 additions & 66 deletions cmd/influxd/upgrade/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@ package upgrade

import (
"context"
"errors"
"fmt"
"github.com/dustin/go-humanize"
"os"
"path/filepath"
"strings"

"github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/cmd/internal"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/tcnksm/go-input"
"go.uber.org/zap"
)

// upgradeDatabases creates databases, buckets, retention policies and shard info according to 1.x meta and copies data
func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opts *optionsV1, v2opts *optionsV2, orgID influxdb.ID, log *zap.Logger) (map[string][]influxdb.ID, error) {
func upgradeDatabases(ctx context.Context, ui *input.UI, v1 *influxDBv1, v2 *influxDBv2, opts *options, orgID influxdb.ID, log *zap.Logger) (map[string][]influxdb.ID, error) {
v1opts := opts.source
v2opts := opts.target
db2BucketIds := make(map[string][]influxdb.ID)

targetDataPath := filepath.Join(v2opts.enginePath, "data")
Expand All @@ -33,29 +38,8 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
log.Info("No database found in the 1.x meta")
return db2BucketIds, nil
}
// Check space
log.Info("Checking space")
size, err := DirSize(v1opts.dataDir)
if err != nil {
return nil, fmt.Errorf("error getting size of %s: %w", v1opts.dataDir, err)
}
size2, err := DirSize(v1opts.walDir)
if err != nil {
return nil, fmt.Errorf("error getting size of %s: %w", v1opts.walDir, err)
}
size += size2
v2dir := filepath.Dir(v2opts.boltPath)
diskInfo, err := fs.DiskUsage(v2dir)
if err != nil {
return nil, fmt.Errorf("error getting info of disk %s: %w", v2dir, err)
}
if options.verbose {
log.Info("Disk space info",
zap.String("Free space", humanize.Bytes(diskInfo.Free)),
zap.String("Requested space", humanize.Bytes(size)))
}
if size > diskInfo.Free {
return nil, fmt.Errorf("not enough space on target disk of %s: need %d, available %d ", v2dir, size, diskInfo.Free)
if err := checkDiskSpace(ui, opts, log); err != nil {
return nil, err
}

cqFile, err := os.OpenFile(v2opts.cqPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
Expand All @@ -71,15 +55,10 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
// export any continuous queries
for _, db := range v1.meta.Databases() {
if db.Name == "_internal" {
if options.verbose {
log.Info("Skipping _internal ")
}
log.Debug("Skipping _internal ")
continue
}
if options.verbose {
log.Info("Upgrading database ",
zap.String("database", db.Name))
}
log.Debug("Upgrading database", zap.String("database", db.Name))

// db to buckets IDs mapping
db2BucketIds[db.Name] = make([]influxdb.ID, 0, len(db.RetentionPolicies))
Expand All @@ -95,21 +74,15 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
RetentionPolicyName: rp.Name,
RetentionPeriod: rp.Duration,
}
if options.verbose {
log.Info("Creating bucket ",
zap.String("Bucket", bucket.Name))
}
log.Debug("Creating bucket", zap.String("Bucket", bucket.Name))
err = v2.bucketSvc.CreateBucket(ctx, bucket)
if err != nil {
return nil, fmt.Errorf("error creating bucket %s: %w", bucket.Name, err)

}

db2BucketIds[db.Name] = append(db2BucketIds[db.Name], bucket.ID)
if options.verbose {
log.Info("Creating database with retention policy",
zap.String("database", bucket.ID.String()))
}
log.Debug("Creating database with retention policy", zap.String("database", bucket.ID.String()))
spec := rp.ToSpec()
spec.Name = meta.DefaultRetentionPolicyName
dbv2, err := v2.meta.CreateDatabaseWithRetentionPolicy(bucket.ID.String(), spec)
Expand All @@ -124,25 +97,25 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
OrganizationID: orgID,
BucketID: bucket.ID,
}
if options.verbose {
log.Info("Creating mapping",
zap.String("database", mapping.Database),
zap.String("retention policy", mapping.RetentionPolicy),
zap.String("orgID", mapping.OrganizationID.String()),
zap.String("bucketID", mapping.BucketID.String()))
}
log.Debug(
"Creating mapping",
zap.String("database", mapping.Database),
zap.String("retention policy", mapping.RetentionPolicy),
zap.String("orgID", mapping.OrganizationID.String()),
zap.String("bucketID", mapping.BucketID.String()),
)
err = v2.dbrpSvc.Create(ctx, mapping)
if err != nil {
return nil, fmt.Errorf("error creating mapping %s/%s -> Org %s, bucket %s: %w", mapping.Database, mapping.RetentionPolicy, mapping.OrganizationID.String(), mapping.BucketID.String(), err)
}
shardsNum := 0
for _, sg := range rp.ShardGroups {
if options.verbose {
log.Info("Creating shard group",
zap.String("database", dbv2.Name),
zap.String("retention policy", dbv2.DefaultRetentionPolicy),
zap.Time("time", sg.StartTime))
}
log.Debug(
"Creating shard group",
zap.String("database", dbv2.Name),
zap.String("retention policy", dbv2.DefaultRetentionPolicy),
zap.Time("time", sg.StartTime),
)
shardsNum += len(sg.Shards)
_, err := v2.meta.CreateShardGroupWithShards(dbv2.Name, dbv2.DefaultRetentionPolicy, sg.StartTime, sg.Shards)
if err != nil {
Expand All @@ -152,11 +125,11 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
//empty retention policy doesn't have data
if shardsNum > 0 {
targetPath := filepath.Join(targetDataPath, dbv2.Name, spec.Name)
if options.verbose {
log.Info("Copying data",
zap.String("source", sourcePath),
zap.String("target", targetPath))
}
log.Debug(
"Copying data",
zap.String("source", sourcePath),
zap.String("target", targetPath),
)
err = CopyDir(sourcePath,
targetPath,
nil,
Expand All @@ -167,11 +140,11 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
}
sourcePath = filepath.Join(v1opts.walDir, db.Name, rp.Name)
targetPath = filepath.Join(targetWalPath, dbv2.Name, spec.Name)
if options.verbose {
log.Info("Copying wal",
zap.String("source", sourcePath),
zap.String("target", targetPath))
}
log.Debug(
"Copying wal",
zap.String("source", sourcePath),
zap.String("target", targetPath),
)
err = CopyDir(sourcePath,
targetPath,
nil,
Expand Down Expand Up @@ -204,9 +177,7 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt
}

for _, cq := range db.ContinuousQueries {
if options.verbose {
log.Info("Exporting CQ", zap.String("db", db.Name), zap.String("cq_name", cq.Name))
}
log.Debug("Exporting CQ", zap.String("db", db.Name), zap.String("cq_name", cq.Name))
padding := maxNameLen - len(cq.Name) + 1

_, err := cqFile.WriteString(fmt.Sprintf("%s%s%s\n", cq.Name, strings.Repeat(" ", padding), cq.Query))
Expand All @@ -222,3 +193,45 @@ func upgradeDatabases(ctx context.Context, v1 *influxDBv1, v2 *influxDBv2, v1opt

return db2BucketIds, nil
}

// checkDiskSpace ensures there is enough room at the target path to store
// a full copy of all V1 data.
func checkDiskSpace(ui *input.UI, opts *options, log *zap.Logger) error {
log.Info("Checking available disk space")

size, err := DirSize(opts.source.dataDir)
if err != nil {
return fmt.Errorf("error getting size of %s: %w", opts.source.dataDir, err)
}

walSize, err := DirSize(opts.source.walDir)
if err != nil {
return fmt.Errorf("error getting size of %s: %w", opts.source.walDir, err)
}
size += walSize

v2dir := filepath.Dir(opts.target.boltPath)
diskInfo, err := fs.DiskUsage(v2dir)
if err != nil {
return fmt.Errorf("error getting info of disk %s: %w", v2dir, err)
}

freeBytes := humanize.Bytes(diskInfo.Free)
requiredBytes := humanize.Bytes(size)
log.Info("Computed disk space", zap.String("free", freeBytes), zap.String("required", requiredBytes))

if size > diskInfo.Free {
return fmt.Errorf("not enough space on target disk of %s: need %d, available %d", v2dir, size, diskInfo.Free)
}
if !opts.force {
if confirmed := internal.GetConfirm(ui, func() string {
return fmt.Sprintf(`Proceeding will copy all V1 data to %q
Space available: %s
Space required: %s
`, v2dir, freeBytes, requiredBytes)
}); !confirmed {
return errors.New("upgrade was canceled")
}
}
return nil
}
13 changes: 9 additions & 4 deletions cmd/influxd/upgrade/database_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package upgrade

import (
"bytes"
"context"
"io/ioutil"
"net/http"
Expand All @@ -11,14 +12,14 @@ import (
"testing"

"github.com/dustin/go-humanize"

"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/bolt"
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
"github.com/influxdata/influxdb/v2/internal/testutil"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tcnksm/go-input"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -61,8 +62,8 @@ func TestUpgradeRealDB(t *testing.T) {
v2, err := newInfluxDBv2(ctx, v2opts, zap.NewNop())
require.Nil(t, err)

options.target = *v2opts
req, err := nonInteractive()
opts := &options{source: *v1opts, target: *v2opts, force: true}
req, err := nonInteractive(opts)
require.Nil(t, err)
assert.Equal(t, req.RetentionPeriod, humanize.Week, "Retention policy should pass through")

Expand All @@ -82,7 +83,11 @@ func TestUpgradeRealDB(t *testing.T) {
log, err := zap.NewDevelopment()
require.Nil(t, err)

db2bids, err := upgradeDatabases(ctx, v1, v2, v1opts, v2opts, resp.Org.ID, log)
ui := &input.UI{
Writer: &bytes.Buffer{},
Reader: &bytes.Buffer{},
}
db2bids, err := upgradeDatabases(ctx, ui, v1, v2, opts, resp.Org.ID, log)
require.Nil(t, err)

err = v2.close()
Expand Down
22 changes: 8 additions & 14 deletions cmd/influxd/upgrade/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strconv"
"time"
Expand All @@ -25,22 +24,22 @@ func setupAdmin(ctx context.Context, v2 *influxDBv2, req *influxdb.OnboardingReq
return res, nil
}

func isInteractive() bool {
func isInteractive(options *options) bool {
return !options.force ||
options.target.userName == "" ||
options.target.password == "" ||
options.target.orgName == "" ||
options.target.bucket == ""
}

func onboardingRequest() (*influxdb.OnboardingRequest, error) {
if isInteractive() {
return interactive()
func onboardingRequest(ui *input.UI, options *options) (*influxdb.OnboardingRequest, error) {
if isInteractive(options) {
return interactive(ui, options)
}
return nonInteractive()
return nonInteractive(options)
}

func nonInteractive() (*influxdb.OnboardingRequest, error) {
func nonInteractive(options *options) (*influxdb.OnboardingRequest, error) {
if len(options.target.password) < internal.MinPasswordLen {
return nil, internal.ErrPasswordIsTooShort
}
Expand All @@ -63,11 +62,7 @@ func nonInteractive() (*influxdb.OnboardingRequest, error) {
return req, nil
}

func interactive() (req *influxdb.OnboardingRequest, err error) {
ui := &input.UI{
Writer: os.Stdout,
Reader: os.Stdin,
}
func interactive(ui *input.UI, options *options) (req *influxdb.OnboardingRequest, err error) {
req = new(influxdb.OnboardingRequest)
fmt.Println(string(internal.PromptWithColor(`Welcome to InfluxDB 2.0 upgrade!`, internal.ColorYellow)))
if options.target.userName != "" {
Expand Down Expand Up @@ -119,8 +114,7 @@ func interactive() (req *influxdb.OnboardingRequest, err error) {
if req.RetentionPeriod > 0 {
rp = fmt.Sprintf("%d hrs", req.RetentionPeriod/time.Hour)
}
return fmt.Sprintf(`
You have entered:
return fmt.Sprintf(`You have entered:
Username: %s
Organization: %s
Bucket: %s
Expand Down
Loading

0 comments on commit 30306e5

Please sign in to comment.