Skip to content

Commit

Permalink
refactor(storage): introduce readonly only storage interface
Browse files Browse the repository at this point in the history
refactor(storage/fs): export SyncedStore ability to set snapshot

refactor(storage/fs/local): embed synced store directly

refactor(storage/fs/s3): embed synced store directly

refactor(storage/fs/git): embed synced store directly

refactor(storage/fs/oci): embed synced store directly

refactor(cmd/grpc): update calls to declarative backend stores

refactor(storage/fs): rename files from source to store

refactor(storage/fs/git): simplify condition where reference is a revision

refactor(storage/fs): remove store abstraction

fix(storage/fs/oci): remove errant error check from store test

refactor(storage/fs): rename SyncedStore to Store

refactor(storage/fs): add SnapshotStore functional transaction interface
  • Loading branch information
GeorgeMac committed Dec 15, 2023
1 parent 0466c1f commit 9f79b46
Show file tree
Hide file tree
Showing 24 changed files with 1,576 additions and 1,663 deletions.
155 changes: 4 additions & 151 deletions internal/cmd/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"sync"
"time"
Expand All @@ -20,7 +19,6 @@ import (
"go.flipt.io/flipt/internal/config"
"go.flipt.io/flipt/internal/containers"
"go.flipt.io/flipt/internal/info"
"go.flipt.io/flipt/internal/oci"
fliptserver "go.flipt.io/flipt/internal/server"
"go.flipt.io/flipt/internal/server/audit"
"go.flipt.io/flipt/internal/server/audit/logfile"
Expand All @@ -33,8 +31,7 @@ import (
middlewaregrpc "go.flipt.io/flipt/internal/server/middleware/grpc"
"go.flipt.io/flipt/internal/storage"
storagecache "go.flipt.io/flipt/internal/storage/cache"
"go.flipt.io/flipt/internal/storage/fs"
storageoci "go.flipt.io/flipt/internal/storage/fs/oci"
fsstore "go.flipt.io/flipt/internal/storage/fs/store"
fliptsql "go.flipt.io/flipt/internal/storage/sql"
"go.flipt.io/flipt/internal/storage/sql/mysql"
"go.flipt.io/flipt/internal/storage/sql/postgres"
Expand All @@ -52,20 +49,13 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/crypto/ssh"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"

"github.com/go-git/go-git/v5/plumbing/transport/http"
gitssh "github.com/go-git/go-git/v5/plumbing/transport/ssh"
"go.flipt.io/flipt/internal/storage/fs/git"
"go.flipt.io/flipt/internal/storage/fs/local"
"go.flipt.io/flipt/internal/storage/fs/s3"

grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
Expand Down Expand Up @@ -154,119 +144,12 @@ func NewGRPCServer(
}

logger.Debug("database driver configured", zap.Stringer("driver", driver))
case config.GitStorageType:
opts := []containers.Option[git.Source]{
git.WithRef(cfg.Storage.Git.Ref),
git.WithPollInterval(cfg.Storage.Git.PollInterval),
git.WithInsecureTLS(cfg.Storage.Git.InsecureSkipTLS),
}

if cfg.Storage.Git.CaCertBytes != "" {
opts = append(opts, git.WithCABundle([]byte(cfg.Storage.Git.CaCertBytes)))
} else if cfg.Storage.Git.CaCertPath != "" {
if bytes, err := os.ReadFile(cfg.Storage.Git.CaCertPath); err == nil {
opts = append(opts, git.WithCABundle(bytes))
} else {
return nil, err
}
}

auth := cfg.Storage.Git.Authentication
switch {
case auth.BasicAuth != nil:
opts = append(opts, git.WithAuth(&http.BasicAuth{
Username: auth.BasicAuth.Username,
Password: auth.BasicAuth.Password,
}))
case auth.TokenAuth != nil:
opts = append(opts, git.WithAuth(&http.TokenAuth{
Token: auth.TokenAuth.AccessToken,
}))
case auth.SSHAuth != nil:
var method *gitssh.PublicKeys
if auth.SSHAuth.PrivateKeyBytes != "" {
method, err = gitssh.NewPublicKeys(
auth.SSHAuth.User,
[]byte(auth.SSHAuth.PrivateKeyBytes),
auth.SSHAuth.Password,
)
} else {
method, err = gitssh.NewPublicKeysFromFile(
auth.SSHAuth.User,
auth.SSHAuth.PrivateKeyPath,
auth.SSHAuth.Password,
)
}
if err != nil {
return nil, err
}

// we're protecting against this explicitly so we can disable
// the gosec linting rule
if auth.SSHAuth.InsecureIgnoreHostKey {
// nolint:gosec
method.HostKeyCallback = ssh.InsecureIgnoreHostKey()
}

opts = append(opts, git.WithAuth(method))
}

source, err := git.NewSource(logger, cfg.Storage.Git.Repository, opts...)
if err != nil {
return nil, err
}

store, err = fs.NewStore(logger, source)
if err != nil {
return nil, err
}
case config.LocalStorageType:
source, err := local.NewSource(logger, cfg.Storage.Local.Path)
if err != nil {
return nil, err
}

store, err = fs.NewStore(logger, source)
if err != nil {
return nil, err
}
case config.ObjectStorageType:
store, err = NewObjectStore(cfg, logger)
if err != nil {
return nil, err
}
case config.OCIStorageType:
var opts []containers.Option[oci.StoreOptions]
if auth := cfg.Storage.OCI.Authentication; auth != nil {
opts = append(opts, oci.WithCredentials(
auth.Username,
auth.Password,
))
}

ocistore, err := oci.NewStore(logger, cfg.Storage.OCI.BundlesDirectory, opts...)
if err != nil {
return nil, err
}

ref, err := oci.ParseReference(cfg.Storage.OCI.Repository)
if err != nil {
return nil, err
}

source, err := storageoci.NewSource(logger, ocistore, ref,
storageoci.WithPollInterval(cfg.Storage.OCI.PollInterval),
)
if err != nil {
return nil, err
}

store, err = fs.NewStore(logger, source)
default:
// otherwise, attempt to configure a declarative backend store
store, err = fsstore.NewStore(ctx, logger, cfg)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unexpected storage type: %q", cfg.Storage.Type)
}

logger.Debug("store enabled", zap.Stringer("store", store))
Expand Down Expand Up @@ -507,36 +390,6 @@ func NewGRPCServer(
return server, nil
}

// NewObjectStore create a new storate.Store from the object config
func NewObjectStore(cfg *config.Config, logger *zap.Logger) (storage.Store, error) {
objectCfg := cfg.Storage.Object
var store storage.Store
// keep this as a case statement in anticipation of
// more object types in the future
// nolint:gocritic
switch objectCfg.Type {
case config.S3ObjectSubStorageType:
opts := []containers.Option[s3.Source]{
s3.WithPollInterval(objectCfg.S3.PollInterval),
}
if objectCfg.S3.Endpoint != "" {
opts = append(opts, s3.WithEndpoint(objectCfg.S3.Endpoint))
}
if objectCfg.S3.Region != "" {
opts = append(opts, s3.WithRegion(objectCfg.S3.Region))
}
source, err := s3.NewSource(logger, objectCfg.S3.Bucket, opts...)
if err != nil {
return nil, err
}
store, err = fs.NewStore(logger, source)
if err != nil {
return nil, err
}
}
return store, nil
}

// Run begins serving gRPC requests.
// This methods blocks until Shutdown is called.
func (s *GRPCServer) Run() error {
Expand Down
188 changes: 0 additions & 188 deletions internal/storage/fs/git/source.go

This file was deleted.

Loading

0 comments on commit 9f79b46

Please sign in to comment.