-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: move replications store functionality to separate package (#…
…22923) * refactor: move replications store functionality to separate package * fix: make opening all repls on startup work right
- Loading branch information
1 parent
5a1e375
commit f47d514
Showing
5 changed files
with
1,550 additions
and
794 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,255 @@ | ||
package internal | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
|
||
sq "github.com/Masterminds/squirrel" | ||
"github.com/influxdata/influxdb/v2" | ||
"github.com/influxdata/influxdb/v2/kit/platform" | ||
ierrors "github.com/influxdata/influxdb/v2/kit/platform/errors" | ||
"github.com/influxdata/influxdb/v2/sqlite" | ||
"github.com/mattn/go-sqlite3" | ||
) | ||
|
||
var errReplicationNotFound = &ierrors.Error{ | ||
Code: ierrors.ENotFound, | ||
Msg: "replication not found", | ||
} | ||
|
||
func errRemoteNotFound(id platform.ID, cause error) error { | ||
return &ierrors.Error{ | ||
Code: ierrors.EInvalid, | ||
Msg: fmt.Sprintf("remote %q not found", id), | ||
Err: cause, | ||
} | ||
} | ||
|
||
type Store struct { | ||
sqlStore *sqlite.SqlStore | ||
} | ||
|
||
func NewStore(sqlStore *sqlite.SqlStore) *Store { | ||
return &Store{ | ||
sqlStore: sqlStore, | ||
} | ||
} | ||
|
||
func (s *Store) Lock() { | ||
s.sqlStore.Mu.Lock() | ||
} | ||
|
||
func (s *Store) Unlock() { | ||
s.sqlStore.Mu.Unlock() | ||
} | ||
|
||
// ListReplications returns a list of replications matching the provided filter. | ||
func (s *Store) ListReplications(ctx context.Context, filter influxdb.ReplicationListFilter) (*influxdb.Replications, error) { | ||
q := sq.Select( | ||
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", | ||
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data"). | ||
From("replications") | ||
|
||
if filter.OrgID.Valid() { | ||
q = q.Where(sq.Eq{"org_id": filter.OrgID}) | ||
} | ||
if filter.Name != nil { | ||
q = q.Where(sq.Eq{"name": *filter.Name}) | ||
} | ||
if filter.RemoteID != nil { | ||
q = q.Where(sq.Eq{"remote_id": *filter.RemoteID}) | ||
} | ||
if filter.LocalBucketID != nil { | ||
q = q.Where(sq.Eq{"local_bucket_id": *filter.LocalBucketID}) | ||
} | ||
|
||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var rs influxdb.Replications | ||
if err := s.sqlStore.DB.SelectContext(ctx, &rs.Replications, query, args...); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &rs, nil | ||
} | ||
|
||
// CreateReplication persists a new replication in the database. Caller is responsible for managing locks. | ||
func (s *Store) CreateReplication(ctx context.Context, newID platform.ID, request influxdb.CreateReplicationRequest) (*influxdb.Replication, error) { | ||
q := sq.Insert("replications"). | ||
SetMap(sq.Eq{ | ||
"id": newID, | ||
"org_id": request.OrgID, | ||
"name": request.Name, | ||
"description": request.Description, | ||
"remote_id": request.RemoteID, | ||
"local_bucket_id": request.LocalBucketID, | ||
"remote_bucket_id": request.RemoteBucketID, | ||
"max_queue_size_bytes": request.MaxQueueSizeBytes, | ||
"drop_non_retryable_data": request.DropNonRetryableData, | ||
"created_at": "datetime('now')", | ||
"updated_at": "datetime('now')", | ||
}). | ||
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data") | ||
|
||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var r influxdb.Replication | ||
|
||
if err := s.sqlStore.DB.GetContext(ctx, &r, query, args...); err != nil { | ||
if sqlErr, ok := err.(sqlite3.Error); ok && sqlErr.ExtendedCode == sqlite3.ErrConstraintForeignKey { | ||
return nil, errRemoteNotFound(request.RemoteID, err) | ||
} | ||
return nil, err | ||
} | ||
|
||
return &r, nil | ||
} | ||
|
||
// GetReplication gets a replication by ID from the database. | ||
func (s *Store) GetReplication(ctx context.Context, id platform.ID) (*influxdb.Replication, error) { | ||
q := sq.Select( | ||
"id", "org_id", "name", "description", "remote_id", "local_bucket_id", "remote_bucket_id", | ||
"max_queue_size_bytes", "latest_response_code", "latest_error_message", "drop_non_retryable_data"). | ||
From("replications"). | ||
Where(sq.Eq{"id": id}) | ||
|
||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var r influxdb.Replication | ||
if err := s.sqlStore.DB.GetContext(ctx, &r, query, args...); err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return nil, errReplicationNotFound | ||
} | ||
return nil, err | ||
} | ||
|
||
return &r, nil | ||
} | ||
|
||
// UpdateReplication updates a replication by ID. Caller is responsible for managing locks. | ||
func (s *Store) UpdateReplication(ctx context.Context, id platform.ID, request influxdb.UpdateReplicationRequest) (*influxdb.Replication, error) { | ||
updates := sq.Eq{"updated_at": sq.Expr("datetime('now')")} | ||
if request.Name != nil { | ||
updates["name"] = *request.Name | ||
} | ||
if request.Description != nil { | ||
updates["description"] = *request.Description | ||
} | ||
if request.RemoteID != nil { | ||
updates["remote_id"] = *request.RemoteID | ||
} | ||
if request.RemoteBucketID != nil { | ||
updates["remote_bucket_id"] = *request.RemoteBucketID | ||
} | ||
if request.MaxQueueSizeBytes != nil { | ||
updates["max_queue_size_bytes"] = *request.MaxQueueSizeBytes | ||
} | ||
if request.DropNonRetryableData != nil { | ||
updates["drop_non_retryable_data"] = *request.DropNonRetryableData | ||
} | ||
|
||
q := sq.Update("replications").SetMap(updates).Where(sq.Eq{"id": id}). | ||
Suffix("RETURNING id, org_id, name, description, remote_id, local_bucket_id, remote_bucket_id, max_queue_size_bytes, drop_non_retryable_data") | ||
|
||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var r influxdb.Replication | ||
if err := s.sqlStore.DB.GetContext(ctx, &r, query, args...); err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return nil, errReplicationNotFound | ||
} | ||
if sqlErr, ok := err.(sqlite3.Error); ok && request.RemoteID != nil && sqlErr.ExtendedCode == sqlite3.ErrConstraintForeignKey { | ||
return nil, errRemoteNotFound(*request.RemoteID, err) | ||
} | ||
return nil, err | ||
} | ||
|
||
return &r, nil | ||
} | ||
|
||
// DeleteReplication deletes a replication by ID from the database. Caller is responsible for managing locks. | ||
func (s *Store) DeleteReplication(ctx context.Context, id platform.ID) error { | ||
q := sq.Delete("replications").Where(sq.Eq{"id": id}).Suffix("RETURNING id") | ||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var d platform.ID | ||
if err := s.sqlStore.DB.GetContext(ctx, &d, query, args...); err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return errReplicationNotFound | ||
} | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// DeleteBucketReplications deletes the replications for the provided localBucketID from the database. Caller is | ||
// responsible for managing locks. A list of deleted IDs is returned for further processing by the caller. | ||
func (s *Store) DeleteBucketReplications(ctx context.Context, localBucketID platform.ID) ([]platform.ID, error) { | ||
q := sq.Delete("replications").Where(sq.Eq{"local_bucket_id": localBucketID}).Suffix("RETURNING id") | ||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var deleted []platform.ID | ||
if err := s.sqlStore.DB.SelectContext(ctx, &deleted, query, args...); err != nil { | ||
return nil, err | ||
} | ||
|
||
return deleted, nil | ||
} | ||
|
||
func (s *Store) GetFullHTTPConfig(ctx context.Context, id platform.ID) (*ReplicationHTTPConfig, error) { | ||
q := sq.Select("c.remote_url", "c.remote_api_token", "c.remote_org_id", "c.allow_insecure_tls", "r.remote_bucket_id"). | ||
From("replications r").InnerJoin("remotes c ON r.remote_id = c.id AND r.id = ?", id) | ||
|
||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
var rc ReplicationHTTPConfig | ||
if err := s.sqlStore.DB.GetContext(ctx, &rc, query, args...); err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return nil, errReplicationNotFound | ||
} | ||
return nil, err | ||
} | ||
return &rc, nil | ||
} | ||
|
||
func (s *Store) PopulateRemoteHTTPConfig(ctx context.Context, id platform.ID, target *ReplicationHTTPConfig) error { | ||
q := sq.Select("remote_url", "remote_api_token", "remote_org_id", "allow_insecure_tls"). | ||
From("remotes").Where(sq.Eq{"id": id}) | ||
query, args, err := q.ToSql() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if err := s.sqlStore.DB.GetContext(ctx, target, query, args...); err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
return errRemoteNotFound(id, nil) | ||
} | ||
return err | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.