Skip to content

Commit

Permalink
feat: replicate based on bucket name rather than id (#23638)
Browse files Browse the repository at this point in the history
* feat: add the ability to replicate based on bucket name rather than bucket id.

- This adds compatibility with 1.x replication targets

* fix: improve error checking and add tests

* fix: add additional constraint to replications table

* fix: use OR not AND for constraint

* feat: delete invalid replications on downgrade

* fix: should be less than 2.4

* test: add test around down migration and cleanup migration code

* fix: use nil instead of platform.ID(1) for better consistency

* fix: fix tests

* fix: fix tests
  • Loading branch information
jeffreyssmith2nd authored Aug 18, 2022
1 parent 48fb5ce commit 6f50e70
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 44 deletions.
41 changes: 41 additions & 0 deletions cmd/influxd/downgrade/downgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/influxdata/influxdb/v2"
Expand Down Expand Up @@ -35,6 +36,7 @@ type migrationTarget struct {
var downgradeMigrationTargets = map[string]migrationTarget{
"2.0": {kvMigration: 15, sqlMigration: 0},
"2.1": {kvMigration: 18, sqlMigration: 3},
"2.3": {kvMigration: 20, sqlMigration: 6},
}

func NewCommand(ctx context.Context, v *viper.Viper) (*cobra.Command, error) {
Expand Down Expand Up @@ -120,6 +122,12 @@ The target version of the downgrade must be specified, i.e. "influxd downgrade 2
func downgrade(ctx context.Context, boltPath, sqlitePath, targetVersion string, log *zap.Logger) error {
info := influxdb.GetBuildInfo()

n, err := compareVersionStrings(targetVersion, "2.4.0")
if n < 0 || err != nil {
errStr := "if the target version is less than 2.4.0, any replications using bucket names rather than ids will be deleted"
log.Warn("downgrade warning", zap.String("targetVersion", errStr))
}

// Files must exist at the specified paths for the downgrade to work properly. The bolt and sqlite "open" methods will
// create files if they do not exist, so their existence must be verified here.
if _, err := os.Stat(boltPath); err != nil {
Expand Down Expand Up @@ -174,3 +182,36 @@ func downgrade(ctx context.Context, boltPath, sqlitePath, targetVersion string,
zap.String("version", targetVersion))
return nil
}

func compareVersionStrings(left string, right string) (int, error) {
l := strings.Split(left, ".")
r := strings.Split(right, ".")
loop := len(r)
if len(l) > len(r) {
loop = len(l)
}
for i := 0; i < loop; i++ {
var x, y string
if len(l) > i {
x = l[i]
}
if len(r) > i {
y = r[i]
}
lefti, err := strconv.Atoi(x)
if err != nil {
return 0, err
}
righti, err := strconv.Atoi(y)
if err != nil {
return 0, err
}

if lefti > righti {
return 1, nil
} else if lefti < righti {
return -1, nil
}
}
return 0, nil
}
30 changes: 17 additions & 13 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ var ErrMaxQueueSizeTooSmall = errors.Error{

// Replication contains all info about a replication that should be returned to users.
type Replication struct {
ID platform.ID `json:"id" db:"id"`
OrgID platform.ID `json:"orgID" db:"org_id"`
Name string `json:"name" db:"name"`
Description *string `json:"description,omitempty" db:"description"`
RemoteID platform.ID `json:"remoteID" db:"remote_id"`
LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"`
RemoteBucketID platform.ID `json:"remoteBucketID" db:"remote_bucket_id"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"`
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"`
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
ID platform.ID `json:"id" db:"id"`
OrgID platform.ID `json:"orgID" db:"org_id"`
Name string `json:"name" db:"name"`
Description *string `json:"description,omitempty" db:"description"`
RemoteID platform.ID `json:"remoteID" db:"remote_id"`
LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"`
RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"`
RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"`
CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"`
LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"`
LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"`
DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"`
MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"`
}

// ReplicationListFilter is a selection filter for listing replications.
Expand Down Expand Up @@ -65,6 +66,7 @@ type CreateReplicationRequest struct {
RemoteID platform.ID `json:"remoteID"`
LocalBucketID platform.ID `json:"localBucketID"`
RemoteBucketID platform.ID `json:"remoteBucketID"`
RemoteBucketName string `json:"remoteBucketName"`
MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes,omitempty"`
DropNonRetryableData bool `json:"dropNonRetryableData,omitempty"`
MaxAgeSeconds int64 `json:"maxAgeSeconds,omitempty"`
Expand All @@ -84,6 +86,7 @@ type UpdateReplicationRequest struct {
Description *string `json:"description,omitempty"`
RemoteID *platform.ID `json:"remoteID,omitempty"`
RemoteBucketID *platform.ID `json:"remoteBucketID,omitempty"`
RemoteBucketName *string `json:"remoteBucketName,omitempty"`
MaxQueueSizeBytes *int64 `json:"maxQueueSizeBytes,omitempty"`
DropNonRetryableData *bool `json:"dropNonRetryableData,omitempty"`
MaxAgeSeconds *int64 `json:"maxAgeSeconds,omitempty"`
Expand All @@ -109,5 +112,6 @@ type ReplicationHTTPConfig struct {
RemoteOrgID platform.ID `db:"remote_org_id"`
AllowInsecureTLS bool `db:"allow_insecure_tls"`
RemoteBucketID platform.ID `db:"remote_bucket_id"`
RemoteBucketName string `db:"remote_bucket_name"`
DropNonRetryableData bool `db:"drop_non_retryable_data"`
}
57 changes: 38 additions & 19 deletions replications/internal/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ var errReplicationNotFound = &ierrors.Error{
Msg: "replication not found",
}

var errMissingIDName = &ierrors.Error{
Code: ierrors.EUnprocessableEntity,
Msg: "one of remote_bucket_id, remote_bucket_name should be provided",
}

func errRemoteNotFound(id platform.ID, cause error) error {
return &ierrors.Error{
Code: ierrors.EInvalid,
Expand Down Expand Up @@ -48,7 +53,7 @@ func (s *Store) Unlock() {
// ListReplications returns a list of replications matching the provided filter.
func (s *Store) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) {
q := sq.Select(
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id",
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", "remote_bucket_name",
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data",
"max_age_seconds").
From("replications")
Expand Down Expand Up @@ -81,22 +86,33 @@ func (s *Store) ListReplications(ctx context.Context, filter influxdb.Replicatio

// CreateReplication persists a new replication in the database. Caller is responsible for managing locks.
func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) {
fields := sq.Eq{
"id": newID,
"org_id": request.OrgID,
"name": request.Name,
"description": request.Description,
"remote_id": request.RemoteID,
"local_bucket_id": request.LocalBucketID,
"max_queue_size_bytes": request.MaxQueueSizeBytes,
"drop_non_retryable_data": request.DropNonRetryableData,
"max_age_seconds": request.MaxAgeSeconds,
"created_at": "datetime('now')",
"updated_at": "datetime('now')",
}

if request.RemoteBucketID != platform.ID(0) {
fields["remote_bucket_id"] = request.RemoteBucketID
fields["remote_bucket_name"] = ""
} else if request.RemoteBucketName != "" {
fields["remote_bucket_id"] = nil
fields["remote_bucket_name"] = request.RemoteBucketName
} else {
return nil, errMissingIDName
}

q := sq.Insert("replications").
SetMap(sq.Eq{
"id": newID,
"org_id": request.OrgID,
"name": request.Name,
"description": request.Description,
"remote_id": request.RemoteID,
"local_bucket_id": request.LocalBucketID,
"remote_bucket_id": request.RemoteBucketID,
"max_queue_size_bytes": request.MaxQueueSizeBytes,
"drop_non_retryable_data": request.DropNonRetryableData,
"max_age_seconds": request.MaxAgeSeconds,
"created_at": "datetime('now')",
"updated_at": "datetime('now')",
}).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")
SetMap(fields).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, remote_bucket_name, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")

query, args, err := q.ToSql()
if err != nil {
Expand All @@ -118,7 +134,7 @@ func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, reques
// GetReplication gets a replication by ID from the database.
func (s *Store) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) {
q := sq.Select(
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id",
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", "remote_bucket_name",
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data",
"max_age_seconds").
From("replications").
Expand Down Expand Up @@ -155,6 +171,9 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i
if request.RemoteBucketID != nil {
updates["remote_bucket_id"] = *request.RemoteBucketID
}
if request.RemoteBucketName != nil {
updates["remote_bucket_name"] = *request.RemoteBucketName
}
if request.MaxQueueSizeBytes != nil {
updates["max_queue_size_bytes"] = *request.MaxQueueSizeBytes
}
Expand All @@ -166,7 +185,7 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i
}

q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}).
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, remote_bucket_name, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds")

query, args, err := q.ToSql()
if err != nil {
Expand Down Expand Up @@ -249,7 +268,7 @@ func (s *Store) DeleteBucketReplications(ctx context.Context, localBucketID plat
}

func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*influxdb.ReplicationHTTPConfig, error) {
q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id", "r.drop_non_retryable_data").
q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id", "r.remote_bucket_name", "r.drop_non_retryable_data").
From("replications r").InnerJoin("remotes c ON r.remote_id = c.id AND r.id = ?", id)

query, args, err := q.ToSql()
Expand Down
Loading

0 comments on commit 6f50e70

Please sign in to comment.