From 6f50e70960d82bce02d9276e67739f22b125361e Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Thu, 18 Aug 2022 14:21:59 -0400 Subject: [PATCH] feat: replicate based on bucket name rather than id (#23638) * feat: add the ability to replicate based on bucket name rather than bucket id. - This adds compatibility with 1.x replication targets * fix: improve error checking and add tests * fix: add additional constraint to replications table * fix: use OR not AND for constraint * feat: delete invalid replications on downgrade * fix: should be less than 2.4 * test: add test around down migration and cleanup migration code * fix: use nil instead of platform.ID(1) for better consistency * fix: fix tests * fix: fix tests --- cmd/influxd/downgrade/downgrade.go | 41 +++++ replication.go | 30 ++-- replications/internal/store.go | 57 ++++--- replications/internal/store_test.go | 142 +++++++++++++++++- replications/remotewrite/writer.go | 7 +- replications/service.go | 4 + replications/service_test.go | 13 +- replications/transport/http_test.go | 8 +- ...rate_replications_add_bucket_name.down.sql | 43 ++++++ ...igrate_replications_add_bucket_name.up.sql | 46 ++++++ 10 files changed, 347 insertions(+), 44 deletions(-) create mode 100644 sqlite/migrations/0007_migrate_replications_add_bucket_name.down.sql create mode 100644 sqlite/migrations/0007_migrate_replications_add_bucket_name.up.sql diff --git a/cmd/influxd/downgrade/downgrade.go b/cmd/influxd/downgrade/downgrade.go index 10b89b7840b..e5402849613 100644 --- a/cmd/influxd/downgrade/downgrade.go +++ b/cmd/influxd/downgrade/downgrade.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "github.com/influxdata/influxdb/v2" @@ -35,6 +36,7 @@ type migrationTarget struct { var downgradeMigrationTargets = map[string]migrationTarget{ "2.0": {kvMigration: 15, sqlMigration: 0}, "2.1": {kvMigration: 18, sqlMigration: 3}, + "2.3": {kvMigration: 20, sqlMigration: 6}, } func NewCommand(ctx context.Context, v *viper.Viper) (*cobra.Command, error) { @@ -120,6 +122,12 @@ The target version of the downgrade must be specified, i.e. "influxd downgrade 2 func downgrade(ctx context.Context, boltPath, sqlitePath, targetVersion string, log *zap.Logger) error { info := influxdb.GetBuildInfo() + n, err := compareVersionStrings(targetVersion, "2.4.0") + if n < 0 || err != nil { + errStr := "if the target version is less than 2.4.0, any replications using bucket names rather than ids will be deleted" + log.Warn("downgrade warning", zap.String("targetVersion", errStr)) + } + // Files must exist at the specified paths for the downgrade to work properly. The bolt and sqlite "open" methods will // create files if they do not exist, so their existence must be verified here. if _, err := os.Stat(boltPath); err != nil { @@ -174,3 +182,36 @@ func downgrade(ctx context.Context, boltPath, sqlitePath, targetVersion string, zap.String("version", targetVersion)) return nil } + +func compareVersionStrings(left string, right string) (int, error) { + l := strings.Split(left, ".") + r := strings.Split(right, ".") + loop := len(r) + if len(l) > len(r) { + loop = len(l) + } + for i := 0; i < loop; i++ { + var x, y string + if len(l) > i { + x = l[i] + } + if len(r) > i { + y = r[i] + } + lefti, err := strconv.Atoi(x) + if err != nil { + return 0, err + } + righti, err := strconv.Atoi(y) + if err != nil { + return 0, err + } + + if lefti > righti { + return 1, nil + } else if lefti < righti { + return -1, nil + } + } + return 0, nil +} diff --git a/replication.go b/replication.go index e1523ed7b9d..9333d5b8209 100644 --- a/replication.go +++ b/replication.go @@ -20,19 +20,20 @@ var ErrMaxQueueSizeTooSmall = errors.Error{ // Replication contains all info about a replication that should be returned to users. type Replication struct { - ID platform.ID `json:"id" db:"id"` - OrgID platform.ID `json:"orgID" db:"org_id"` - Name string `json:"name" db:"name"` - Description *string `json:"description,omitempty" db:"description"` - RemoteID platform.ID `json:"remoteID" db:"remote_id"` - LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"` - RemoteBucketID platform.ID `json:"remoteBucketID" db:"remote_bucket_id"` - MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"` - CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"` - LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"` - LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"` - DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"` - MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"` + ID platform.ID `json:"id" db:"id"` + OrgID platform.ID `json:"orgID" db:"org_id"` + Name string `json:"name" db:"name"` + Description *string `json:"description,omitempty" db:"description"` + RemoteID platform.ID `json:"remoteID" db:"remote_id"` + LocalBucketID platform.ID `json:"localBucketID" db:"local_bucket_id"` + RemoteBucketID *platform.ID `json:"remoteBucketID" db:"remote_bucket_id"` + RemoteBucketName string `json:"RemoteBucketName" db:"remote_bucket_name"` + MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes" db:"max_queue_size_bytes"` + CurrentQueueSizeBytes int64 `json:"currentQueueSizeBytes" db:"current_queue_size_bytes"` + LatestResponseCode *int32 `json:"latestResponseCode,omitempty" db:"latest_response_code"` + LatestErrorMessage *string `json:"latestErrorMessage,omitempty" db:"latest_error_message"` + DropNonRetryableData bool `json:"dropNonRetryableData" db:"drop_non_retryable_data"` + MaxAgeSeconds int64 `json:"maxAgeSeconds" db:"max_age_seconds"` } // ReplicationListFilter is a selection filter for listing replications. @@ -65,6 +66,7 @@ type CreateReplicationRequest struct { RemoteID platform.ID `json:"remoteID"` LocalBucketID platform.ID `json:"localBucketID"` RemoteBucketID platform.ID `json:"remoteBucketID"` + RemoteBucketName string `json:"remoteBucketName"` MaxQueueSizeBytes int64 `json:"maxQueueSizeBytes,omitempty"` DropNonRetryableData bool `json:"dropNonRetryableData,omitempty"` MaxAgeSeconds int64 `json:"maxAgeSeconds,omitempty"` @@ -84,6 +86,7 @@ type UpdateReplicationRequest struct { Description *string `json:"description,omitempty"` RemoteID *platform.ID `json:"remoteID,omitempty"` RemoteBucketID *platform.ID `json:"remoteBucketID,omitempty"` + RemoteBucketName *string `json:"remoteBucketName,omitempty"` MaxQueueSizeBytes *int64 `json:"maxQueueSizeBytes,omitempty"` DropNonRetryableData *bool `json:"dropNonRetryableData,omitempty"` MaxAgeSeconds *int64 `json:"maxAgeSeconds,omitempty"` @@ -109,5 +112,6 @@ type ReplicationHTTPConfig struct { RemoteOrgID platform.ID `db:"remote_org_id"` AllowInsecureTLS bool `db:"allow_insecure_tls"` RemoteBucketID platform.ID `db:"remote_bucket_id"` + RemoteBucketName string `db:"remote_bucket_name"` DropNonRetryableData bool `db:"drop_non_retryable_data"` } diff --git a/replications/internal/store.go b/replications/internal/store.go index 77c946767dd..24ef62836b0 100644 --- a/replications/internal/store.go +++ b/replications/internal/store.go @@ -19,6 +19,11 @@ var errReplicationNotFound = &ierrors.Error{ Msg: "replication not found", } +var errMissingIDName = &ierrors.Error{ + Code: ierrors.EUnprocessableEntity, + Msg: "one of remote_bucket_id, remote_bucket_name should be provided", +} + func errRemoteNotFound(id platform.ID, cause error) error { return &ierrors.Error{ Code: ierrors.EInvalid, @@ -48,7 +53,7 @@ func (s *Store) Unlock() { // ListReplications returns a list of replications matching the provided filter. func (s *Store) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { q := sq.Select( - "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", + "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", "remote_bucket_name", "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data", "max_age_seconds"). From("replications") @@ -81,22 +86,33 @@ func (s *Store) ListReplications(ctx context.Context, filter influxdb.Replicatio // CreateReplication persists a new replication in the database. Caller is responsible for managing locks. func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) { + fields := sq.Eq{ + "id": newID, + "org_id": request.OrgID, + "name": request.Name, + "description": request.Description, + "remote_id": request.RemoteID, + "local_bucket_id": request.LocalBucketID, + "max_queue_size_bytes": request.MaxQueueSizeBytes, + "drop_non_retryable_data": request.DropNonRetryableData, + "max_age_seconds": request.MaxAgeSeconds, + "created_at": "datetime('now')", + "updated_at": "datetime('now')", + } + + if request.RemoteBucketID != platform.ID(0) { + fields["remote_bucket_id"] = request.RemoteBucketID + fields["remote_bucket_name"] = "" + } else if request.RemoteBucketName != "" { + fields["remote_bucket_id"] = nil + fields["remote_bucket_name"] = request.RemoteBucketName + } else { + return nil, errMissingIDName + } + q := sq.Insert("replications"). - SetMap(sq.Eq{ - "id": newID, - "org_id": request.OrgID, - "name": request.Name, - "description": request.Description, - "remote_id": request.RemoteID, - "local_bucket_id": request.LocalBucketID, - "remote_bucket_id": request.RemoteBucketID, - "max_queue_size_bytes": request.MaxQueueSizeBytes, - "drop_non_retryable_data": request.DropNonRetryableData, - "max_age_seconds": request.MaxAgeSeconds, - "created_at": "datetime('now')", - "updated_at": "datetime('now')", - }). - Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds") + SetMap(fields). + Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, remote_bucket_name, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds") query, args, err := q.ToSql() if err != nil { @@ -118,7 +134,7 @@ func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, reques // GetReplication gets a replication by ID from the database. func (s *Store) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) { q := sq.Select( - "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", + "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", "remote_bucket_name", "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data", "max_age_seconds"). From("replications"). @@ -155,6 +171,9 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i if request.RemoteBucketID != nil { updates["remote_bucket_id"] = *request.RemoteBucketID } + if request.RemoteBucketName != nil { + updates["remote_bucket_name"] = *request.RemoteBucketName + } if request.MaxQueueSizeBytes != nil { updates["max_queue_size_bytes"] = *request.MaxQueueSizeBytes } @@ -166,7 +185,7 @@ func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request i } q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}). - Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds") + Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, remote_bucket_name, max_queue_size_bytes, drop_non_retryable_data, max_age_seconds") query, args, err := q.ToSql() if err != nil { @@ -249,7 +268,7 @@ func (s *Store) DeleteBucketReplications(ctx context.Context, localBucketID plat } func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*influxdb.ReplicationHTTPConfig, error) { - q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id", "r.drop_non_retryable_data"). + q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id", "r.remote_bucket_name", "r.drop_non_retryable_data"). From("replications r").InnerJoin("remotes c ON r.remote_id = c.id AND r.id = ?", id) query, args, err := q.ToSql() diff --git a/replications/internal/store_test.go b/replications/internal/store_test.go index ba106242e74..41623f68725 100644 --- a/replications/internal/store_test.go +++ b/replications/internal/store_test.go @@ -27,7 +27,7 @@ var ( Description: &desc, RemoteID: platform.ID(100), LocalBucketID: platform.ID(1000), - RemoteBucketID: platform.ID(99999), + RemoteBucketID: idPointer(99999), MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes, MaxAgeSeconds: 0, } @@ -37,7 +37,7 @@ var ( Description: replication.Description, RemoteID: replication.RemoteID, LocalBucketID: replication.LocalBucketID, - RemoteBucketID: replication.RemoteBucketID, + RemoteBucketID: *replication.RemoteBucketID, MaxQueueSizeBytes: replication.MaxQueueSizeBytes, MaxAgeSeconds: replication.MaxAgeSeconds, } @@ -46,7 +46,7 @@ var ( RemoteToken: replication.RemoteID.String(), RemoteOrgID: platform.ID(888888), AllowInsecureTLS: true, - RemoteBucketID: replication.RemoteBucketID, + RemoteBucketID: *replication.RemoteBucketID, } newRemoteID = platform.ID(200) newQueueSize = influxdb.MinReplicationMaxQueueSizeBytes @@ -69,6 +69,11 @@ var ( } ) +func idPointer(id int) *platform.ID { + p := platform.ID(id) + return &p +} + func TestCreateAndGetReplication(t *testing.T) { t.Parallel() @@ -93,6 +98,91 @@ func TestCreateAndGetReplication(t *testing.T) { require.Equal(t, replication, *got) } +func TestCreateAndGetReplicationName(t *testing.T) { + t.Parallel() + + testStore, clean := newTestStore(t) + defer clean(t) + + insertRemote(t, testStore, replication.RemoteID) + + // Getting an invalid ID should return an error. + got, err := testStore.GetReplication(ctx, initID) + require.Equal(t, errReplicationNotFound, err) + require.Nil(t, got) + + req := createReq + req.RemoteBucketID = platform.ID(0) + req.RemoteBucketName = "testbucket" + expected := replication + expected.RemoteBucketName = "testbucket" + expected.RemoteBucketID = nil + + // Create a replication, check the results. + created, err := testStore.CreateReplication(ctx, initID, req) + require.NoError(t, err) + require.Equal(t, expected, *created) + + // Read the created replication and assert it matches the creation response. + got, err = testStore.GetReplication(ctx, created.ID) + require.NoError(t, err) + require.Equal(t, expected, *got) +} + +func TestCreateAndGetReplicationNameAndID(t *testing.T) { + t.Parallel() + + testStore, clean := newTestStore(t) + defer clean(t) + + insertRemote(t, testStore, replication.RemoteID) + + // Getting an invalid ID should return an error. + got, err := testStore.GetReplication(ctx, initID) + require.Equal(t, errReplicationNotFound, err) + require.Nil(t, got) + + req := createReq + req.RemoteBucketID = platform.ID(100) + req.RemoteBucketName = "testbucket" + expected := replication + expected.RemoteBucketName = "" + expected.RemoteBucketID = idPointer(100) + + // Create a replication, check the results. + created, err := testStore.CreateReplication(ctx, initID, req) + require.NoError(t, err) + require.Equal(t, expected, *created) + + // Read the created replication and assert it matches the creation response. + got, err = testStore.GetReplication(ctx, created.ID) + require.NoError(t, err) + require.Equal(t, expected, *got) +} + +func TestCreateAndGetReplicationNameError(t *testing.T) { + t.Parallel() + + testStore, clean := newTestStore(t) + defer clean(t) + + insertRemote(t, testStore, replication.RemoteID) + + // Getting an invalid ID should return an error. + got, err := testStore.GetReplication(ctx, initID) + require.Equal(t, errReplicationNotFound, err) + require.Nil(t, got) + + req := createReq + req.RemoteBucketID = platform.ID(0) + req.RemoteBucketName = "" + + // Create a replication, should fail due to missing params + created, err := testStore.CreateReplication(ctx, initID, req) + require.Equal(t, errMissingIDName, err) + require.Nil(t, created) +} + func TestCreateMissingRemote(t *testing.T) { t.Parallel() @@ -381,6 +471,52 @@ func TestListReplications(t *testing.T) { }) } +func TestMigrateDownFromReplicationsWithName(t *testing.T) { + t.Parallel() + + testStore, clean := newTestStore(t) + defer clean(t) + + insertRemote(t, testStore, replication.RemoteID) + + req := createReq + req.RemoteBucketID = platform.ID(100) + _, err := testStore.CreateReplication(ctx, platform.ID(10), req) + require.NoError(t, err) + + req.RemoteBucketID = platform.ID(0) + req.RemoteBucketName = "testbucket" + req.Name = "namedrepl" + _, err = testStore.CreateReplication(ctx, platform.ID(20), req) + require.NoError(t, err) + + replications, err := testStore.ListReplications(context.Background(), influxdb.ReplicationListFilter{OrgID: replication.OrgID}) + require.NoError(t, err) + require.Equal(t, 2, len(replications.Replications)) + + logger := zaptest.NewLogger(t) + sqliteMigrator := sqlite.NewMigrator(testStore.sqlStore, logger) + require.NoError(t, sqliteMigrator.Down(ctx, 6, migrations.AllDown)) + + // Can't use ListReplications because it expects the `remote_bucket_name` column to be there in this version of influx. + q := sq.Select( + "id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", + "max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data", + "max_age_seconds"). + From("replications") + + q = q.Where(sq.Eq{"org_id": replication.OrgID}) + + query, args, err := q.ToSql() + require.NoError(t, err) + var rs influxdb.Replications + if err := testStore.sqlStore.DB.SelectContext(ctx, &rs.Replications, query, args...); err != nil { + require.NoError(t, err) + } + require.Equal(t, 1, len(rs.Replications)) + require.Equal(t, platform.ID(10), rs.Replications[0].ID) +} + func TestGetFullHTTPConfig(t *testing.T) { t.Parallel() diff --git a/replications/remotewrite/writer.go b/replications/remotewrite/writer.go index 73e22943b85..511d576b412 100644 --- a/replications/remotewrite/writer.go +++ b/replications/remotewrite/writer.go @@ -208,9 +208,14 @@ func PostWrite(ctx context.Context, config *influxdb.ReplicationHTTPConfig, data conf.HTTPClient.Timeout = timeout client := api.NewAPIClient(conf).WriteApi + bucket := config.RemoteBucketID.String() + if config.RemoteBucketName != "" { + bucket = config.RemoteBucketName + } + req := client.PostWrite(ctx). Org(config.RemoteOrgID.String()). - Bucket(config.RemoteBucketID.String()). + Bucket(bucket). Body(data) // Don't set the encoding header for empty bodies, like those used for validation. diff --git a/replications/service.go b/replications/service.go index 039d084a088..155ffb3a197 100644 --- a/replications/service.go +++ b/replications/service.go @@ -140,6 +140,10 @@ func (s *service) CreateReplication(ctx context.Context, request influxdb.Create s.store.Lock() defer s.store.Unlock() + if request.RemoteID == platform.ID(0) && request.RemoteBucketName == "" { + return nil, fmt.Errorf("please supply one of: remoteBucketID, remoteBucketName") + } + if _, err := s.bucketService.FindBucketByID(ctx, request.LocalBucketID); err != nil { return nil, errLocalBucketNotFound(request.LocalBucketID, err) } diff --git a/replications/service_test.go b/replications/service_test.go index c10551e5baf..5f5a0b809e8 100644 --- a/replications/service_test.go +++ b/replications/service_test.go @@ -38,7 +38,7 @@ var ( Description: &desc, RemoteID: platform.ID(100), LocalBucketID: platform.ID(1000), - RemoteBucketID: platform.ID(99999), + RemoteBucketID: idPointer(99999), MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes, } replication2 = influxdb.Replication{ @@ -48,7 +48,7 @@ var ( Description: &desc, RemoteID: platform.ID(100), LocalBucketID: platform.ID(1000), - RemoteBucketID: platform.ID(99999), + RemoteBucketID: idPointer(99999), MaxQueueSizeBytes: 3 * influxdb.DefaultReplicationMaxQueueSizeBytes, } createReq = influxdb.CreateReplicationRequest{ @@ -57,7 +57,7 @@ var ( Description: replication1.Description, RemoteID: replication1.RemoteID, LocalBucketID: replication1.LocalBucketID, - RemoteBucketID: replication1.RemoteBucketID, + RemoteBucketID: *replication1.RemoteBucketID, MaxQueueSizeBytes: replication1.MaxQueueSizeBytes, } newRemoteID = platform.ID(200) @@ -94,10 +94,15 @@ var ( RemoteToken: replication1.RemoteID.String(), RemoteOrgID: platform.ID(888888), AllowInsecureTLS: true, - RemoteBucketID: replication1.RemoteBucketID, + RemoteBucketID: *replication1.RemoteBucketID, } ) +func idPointer(id int) *platform.ID { + p := platform.ID(id) + return &p +} + func TestListReplications(t *testing.T) { t.Parallel() diff --git a/replications/transport/http_test.go b/replications/transport/http_test.go index 3703efbdcd6..6406298820a 100644 --- a/replications/transport/http_test.go +++ b/replications/transport/http_test.go @@ -35,7 +35,7 @@ var ( OrgID: *orgID, RemoteID: *remoteID, LocalBucketID: *localBucketId, - RemoteBucketID: *remoteBucketID, + RemoteBucketID: remoteBucketID, Name: "example", MaxQueueSizeBytes: influxdb.DefaultReplicationMaxQueueSizeBytes, } @@ -79,7 +79,7 @@ func TestReplicationHandler(t *testing.T) { Name: testReplication.Name, RemoteID: testReplication.RemoteID, LocalBucketID: testReplication.LocalBucketID, - RemoteBucketID: testReplication.RemoteBucketID, + RemoteBucketID: *testReplication.RemoteBucketID, } t.Run("with explicit queue size", func(t *testing.T) { @@ -126,7 +126,7 @@ func TestReplicationHandler(t *testing.T) { Name: testReplication.Name, RemoteID: testReplication.RemoteID, LocalBucketID: testReplication.LocalBucketID, - RemoteBucketID: testReplication.RemoteBucketID, + RemoteBucketID: *testReplication.RemoteBucketID, } t.Run("with explicit queue size", func(t *testing.T) { @@ -289,7 +289,7 @@ func TestReplicationHandler(t *testing.T) { Name: testReplication.Name, RemoteID: testReplication.RemoteID, LocalBucketID: testReplication.LocalBucketID, - RemoteBucketID: testReplication.RemoteBucketID, + RemoteBucketID: *testReplication.RemoteBucketID, MaxQueueSizeBytes: influxdb.MinReplicationMaxQueueSizeBytes / 2, } diff --git a/sqlite/migrations/0007_migrate_replications_add_bucket_name.down.sql b/sqlite/migrations/0007_migrate_replications_add_bucket_name.down.sql new file mode 100644 index 00000000000..375e6e540c7 --- /dev/null +++ b/sqlite/migrations/0007_migrate_replications_add_bucket_name.down.sql @@ -0,0 +1,43 @@ +-- Adds the "NOT NULL" to `remote_bucket_id` and removes `remote_bucket_name`. +ALTER TABLE replications RENAME TO _replications_old; + +CREATE TABLE replications +( + id VARCHAR(16) NOT NULL PRIMARY KEY, + org_id VARCHAR(16) NOT NULL, + name TEXT NOT NULL, + description TEXT, + remote_id VARCHAR(16) NOT NULL, + local_bucket_id VARCHAR(16) NOT NULL, + remote_bucket_id VARCHAR(16) NOT NULL, + max_queue_size_bytes INTEGER NOT NULL, + max_age_seconds INTEGER NOT NULL, + latest_response_code INTEGER, + latest_error_message TEXT, + drop_non_retryable_data BOOLEAN NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + + CONSTRAINT replications_uniq_orgid_name UNIQUE (org_id, name), + FOREIGN KEY (remote_id) REFERENCES remotes (id) +); + +INSERT INTO replications SELECT + id, + org_id, + name, + description, + remote_id, + local_bucket_id, + remote_bucket_id, + max_queue_size_bytes, + max_age_seconds, + latest_response_code, + latest_error_message, + drop_non_retryable_data, + created_at,updated_at FROM _replications_old WHERE remote_bucket_name = ''; +DROP TABLE _replications_old; + +-- Create indexes on lookup patterns we expect to be common +CREATE INDEX idx_local_bucket_id_per_org ON replications (org_id, local_bucket_id); +CREATE INDEX idx_remote_id_per_org ON replications (org_id, remote_id); \ No newline at end of file diff --git a/sqlite/migrations/0007_migrate_replications_add_bucket_name.up.sql b/sqlite/migrations/0007_migrate_replications_add_bucket_name.up.sql new file mode 100644 index 00000000000..ba4f3020445 --- /dev/null +++ b/sqlite/migrations/0007_migrate_replications_add_bucket_name.up.sql @@ -0,0 +1,46 @@ +-- Removes the "NOT NULL" from `remote_bucket_id` and adds `remote_bucket_name`. +ALTER TABLE replications RENAME TO _replications_old; + +CREATE TABLE replications +( + id VARCHAR(16) NOT NULL PRIMARY KEY, + org_id VARCHAR(16) NOT NULL, + name TEXT NOT NULL, + description TEXT, + remote_id VARCHAR(16) NOT NULL, + local_bucket_id VARCHAR(16) NOT NULL, + remote_bucket_id VARCHAR(16), + remote_bucket_name TEXT DEFAULT '', + max_queue_size_bytes INTEGER NOT NULL, + max_age_seconds INTEGER NOT NULL, + latest_response_code INTEGER, + latest_error_message TEXT, + drop_non_retryable_data BOOLEAN NOT NULL, + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + + CONSTRAINT replications_uniq_orgid_name UNIQUE (org_id, name), + CONSTRAINT replications_one_of_id_name CHECK (remote_bucket_id IS NOT NULL OR remote_bucket_name != ''), + FOREIGN KEY (remote_id) REFERENCES remotes (id) +); + +INSERT INTO replications ( + id, + org_id, + name, + description, + remote_id, + local_bucket_id, + remote_bucket_id, + max_queue_size_bytes, + max_age_seconds, + latest_response_code, + latest_error_message, + drop_non_retryable_data, + created_at,updated_at +) SELECT * FROM _replications_old; +DROP TABLE _replications_old; + +-- Create indexes on lookup patterns we expect to be common +CREATE INDEX idx_local_bucket_id_per_org ON replications (org_id, local_bucket_id); +CREATE INDEX idx_remote_id_per_org ON replications (org_id, remote_id); \ No newline at end of file