Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(authentication): per method cleanup process #1161

Merged
merged 35 commits into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
33995d9
feat(storage/oplock): new operation lock storage package
GeorgeMac Nov 23, 2022
b4830c8
fix(oplock/testing): use testify assert
GeorgeMac Nov 23, 2022
0cff45b
feat(cleanup): implement authentication cleanup service
GeorgeMac Nov 24, 2022
f5f2838
feat(migrations/postgres): create table operation lock
GeorgeMac Nov 25, 2022
d27448c
feat(migrations/cockroachdb): create table operation lock
GeorgeMac Nov 25, 2022
8d3cdb2
feat(migrations/mysql): create table operation lock
GeorgeMac Nov 25, 2022
c5c7713
refactor(errors): implement generic As function
GeorgeMac Nov 25, 2022
13375c2
chore: fix typo in doc string
GeorgeMac Nov 25, 2022
c781dc0
chore: remove accidental package-lock.json file
GeorgeMac Nov 25, 2022
e1f4256
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 25, 2022
a93b940
fix(config/auth): ensure env vars for cleanup are registered
GeorgeMac Nov 25, 2022
bc8d90e
chore(storage/oplock): increase wait time to reduce flake in test
GeorgeMac Nov 25, 2022
7ae9b1d
chore(storage/oplock): re-define test harness to be deterministic
GeorgeMac Nov 25, 2022
ca2d493
chore(storage/oplock): allow context cancelled on errgroup.Wait
GeorgeMac Nov 25, 2022
3b11b72
fix(migrations/mysql): use correct timestamp type
GeorgeMac Nov 25, 2022
7027e51
fix(migrations/mysql): reference correct primary key
GeorgeMac Nov 25, 2022
2ca5b07
chore(storage/oplock): more permissive error constraints on shutdown …
GeorgeMac Nov 25, 2022
1431184
chore(storage/oplock): use errors.Is in test assertion
GeorgeMac Nov 25, 2022
6a859c8
test(storage/auth): add failing test for non-expiring tokens and expi…
GeorgeMac Nov 25, 2022
e15625d
fix(auth/memory): fix failing condition for auths without expiry time…
GeorgeMac Nov 25, 2022
a007a78
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 25, 2022
b814a50
refactor(config/auth): move cleanup schedule beneath method
GeorgeMac Nov 28, 2022
8acdf15
Merge branch 'gm/auth-cleanup' of github.com:flipt-io/flipt into gm/a…
GeorgeMac Nov 28, 2022
2427689
fix(config/auth): ensure method is enabled when deciding on cleanup
GeorgeMac Nov 28, 2022
c9cc266
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 28, 2022
6939815
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 29, 2022
ec13bbf
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 29, 2022
59f7db9
fix(config/auth): validate interval and grace period on cleanup
GeorgeMac Nov 29, 2022
70c182c
refactor(config/auth): move validation out of func into loop body
GeorgeMac Nov 29, 2022
5254f94
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 29, 2022
b20baac
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 30, 2022
6d08bc8
Merge branch 'main' into gm/auth-cleanup
GeorgeMac Nov 30, 2022
c86fa95
chore(sql/testing): set connection max idle time and lifetime
GeorgeMac Nov 30, 2022
e25f763
fix(sql/testing): establish timeouts only in tests
GeorgeMac Nov 30, 2022
8193c46
test(sql): ping connection with timeout before returning
GeorgeMac Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cmd/flipt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/phyber/negroni-gzip/gzip"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"go.flipt.io/flipt/internal/cleanup"
"go.flipt.io/flipt/internal/config"
"go.flipt.io/flipt/internal/info"
"go.flipt.io/flipt/internal/server"
Expand All @@ -41,6 +42,7 @@ import (
"go.flipt.io/flipt/internal/storage"
authstorage "go.flipt.io/flipt/internal/storage/auth"
authsql "go.flipt.io/flipt/internal/storage/auth/sql"
oplocksql "go.flipt.io/flipt/internal/storage/oplock/sql"
"go.flipt.io/flipt/internal/storage/sql"
"go.flipt.io/flipt/internal/storage/sql/mysql"
"go.flipt.io/flipt/internal/storage/sql/postgres"
Expand Down Expand Up @@ -455,7 +457,21 @@ func run(ctx context.Context, logger *zap.Logger) error {
otelgrpc.UnaryServerInterceptor(),
}

authenticationStore := authsql.NewStore(driver, sql.BuilderFor(db, driver), logger)
var (
sqlBuilder = sql.BuilderFor(db, driver)
authenticationStore = authsql.NewStore(driver, sqlBuilder, logger)
operationLockService = oplocksql.New(logger, driver, sqlBuilder)
)

if cfg.Authentication.ShouldRunCleanup() {
cleanupAuthService := cleanup.NewAuthenticationService(logger, operationLockService, authenticationStore, cfg.Authentication)
cleanupAuthService.Run(ctx)

shutdownFuncs = append(shutdownFuncs, func(context.Context) {
_ = cleanupAuthService.Stop()
logger.Info("cleanup service has been shutdown")
})
}

// only enable enforcement middleware if authentication required
if cfg.Authentication.Required {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS operation_lock;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS operation_lock (
operation VARCHAR(255) PRIMARY KEY UNIQUE NOT NULL,
version INTEGER DEFAULT 0 NOT NULL,
last_acquired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
acquired_until TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS operation_lock;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS operation_lock (
operation VARCHAR(255) UNIQUE NOT NULL,
version INTEGER DEFAULT 0 NOT NULL,
last_acquired_at TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
acquired_until TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
PRIMARY KEY (`operation`)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS operation_lock;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS operation_lock (
operation VARCHAR(255) PRIMARY KEY UNIQUE NOT NULL,
version INTEGER DEFAULT 0 NOT NULL,
last_acquired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
acquired_until TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS operation_lock;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS operation_lock (
operation VARCHAR(255) PRIMARY KEY UNIQUE NOT NULL,
version INTEGER DEFAULT 0 NOT NULL,
last_acquired_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
acquired_until TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
6 changes: 6 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"fmt"
)

// As is a utility for one-lining errors.As statements.
// e.g. cerr, match := errors.As[MyCustomError](err).
func As[E error](err error) (e E, _ bool) {
return e, errors.As(err, &e)
}

// New creates a new error with errors.New
func New(s string) error {
return errors.New(s)
Expand Down
114 changes: 114 additions & 0 deletions internal/cleanup/cleanup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cleanup

import (
"context"
"fmt"
"time"

"go.flipt.io/flipt/internal/config"
authstorage "go.flipt.io/flipt/internal/storage/auth"
"go.flipt.io/flipt/internal/storage/oplock"
"go.flipt.io/flipt/rpc/flipt/auth"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

const minCleanupInterval = 5 * time.Minute

// AuthenticationService is configured to run background goroutines which
// will clear out expired authentication tokens.
type AuthenticationService struct {
logger *zap.Logger
lock oplock.Service
store authstorage.Store
config config.AuthenticationConfig

errgroup errgroup.Group
cancel func()
}

// NewAuthenticationService constructs and configures a new instance of authentication service.
func NewAuthenticationService(logger *zap.Logger, lock oplock.Service, store authstorage.Store, config config.AuthenticationConfig) *AuthenticationService {
return &AuthenticationService{
logger: logger,
lock: lock,
store: store,
config: config,
cancel: func() {},
}
}

func (s *AuthenticationService) schedules() map[auth.Method]config.AuthenticationCleanupSchedule {
schedules := map[auth.Method]config.AuthenticationCleanupSchedule{}
if s.config.Methods.Token.Cleanup != nil {
schedules[auth.Method_METHOD_TOKEN] = *s.config.Methods.Token.Cleanup
}

return schedules
}

// Run starts up a background goroutine per configure authentication method schedule.
func (s *AuthenticationService) Run(ctx context.Context) {
ctx, s.cancel = context.WithCancel(ctx)

for method, schedule := range s.schedules() {
var (
method = method
schedule = schedule
operation = oplock.Operation(fmt.Sprintf("cleanup_auth_%s", method))
)

s.errgroup.Go(func() error {
// on the first attempt to run the cleanup authentication service
// we attempt to obtain the lock immediately. If the lock is already
// held the service should return false and return the current acquired
// current timestamp
acquiredUntil := time.Now().UTC()
for {
select {
case <-ctx.Done():
return nil
case <-time.After(time.Until(acquiredUntil)):
}

acquired, entry, err := s.lock.TryAcquire(ctx, operation, schedule.Interval)
if err != nil {
// ensure we dont go into hot loop when the operation lock service
// enters an error state by ensuring we sleep for at-least the minimum
// interval.
now := time.Now().UTC()
if acquiredUntil.Before(now) {
acquiredUntil = now.Add(minCleanupInterval)
}

s.logger.Warn("attempting to acquire lock", zap.Error(err))
continue
}

// update the next sleep target to current entries acquired until
acquiredUntil = entry.AcquiredUntil

if !acquired {
s.logger.Info("cleanup process not acquired", zap.Time("next_attempt", entry.AcquiredUntil))
continue
}

expiredBefore := time.Now().UTC().Add(-schedule.GracePeriod)
s.logger.Info("cleanup process deleting authentications", zap.Time("expired_before", expiredBefore))
if err := s.store.DeleteAuthentications(ctx, authstorage.Delete(
authstorage.WithMethod(method),
authstorage.WithExpiredBefore(expiredBefore),
)); err != nil {
s.logger.Error("attempting to delete expired authentications", zap.Error(err))
}
}
})
}
}

// Stop signals for the cleanup goroutines to cancel and waits for them to finish.
func (s *AuthenticationService) Stop() error {
s.cancel()

return s.errgroup.Wait()
}
95 changes: 95 additions & 0 deletions internal/cleanup/cleanup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package cleanup

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.flipt.io/flipt/internal/config"
authstorage "go.flipt.io/flipt/internal/storage/auth"
inmemauth "go.flipt.io/flipt/internal/storage/auth/memory"
inmemoplock "go.flipt.io/flipt/internal/storage/oplock/memory"
"go.flipt.io/flipt/rpc/flipt/auth"
"go.uber.org/zap/zaptest"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestCleanup(t *testing.T) {
var (
ctx = context.Background()
logger = zaptest.NewLogger(t)
authstore = inmemauth.NewStore()
lock = inmemoplock.New()
authConfig = config.AuthenticationConfig{
Methods: config.AuthenticationMethods{
Token: config.AuthenticationMethodTokenConfig{
Enabled: true,
Cleanup: &config.AuthenticationCleanupSchedule{
Interval: time.Second,
GracePeriod: 5 * time.Second,
},
},
},
}
)

// create an initial non-expiring token
clientToken, storedAuth, err := authstore.CreateAuthentication(
ctx,
&authstorage.CreateAuthenticationRequest{Method: auth.Method_METHOD_TOKEN},
)
require.NoError(t, err)

for i := 0; i < 5; i++ {
// run five instances of service
// it should be a safe operation given they share the same lock service
service := NewAuthenticationService(logger, lock, authstore, authConfig)
service.Run(ctx)
defer func() {
require.NoError(t, service.Stop())
}()
}

t.Run("ensure non-expiring token exists", func(t *testing.T) {
retrievedAuth, err := authstore.GetAuthenticationByClientToken(ctx, clientToken)
require.NoError(t, err)
assert.Equal(t, storedAuth, retrievedAuth)
})

t.Run("create an expiring token and ensure it exists", func(t *testing.T) {
clientToken, storedAuth, err = authstore.CreateAuthentication(
ctx,
&authstorage.CreateAuthenticationRequest{
Method: auth.Method_METHOD_TOKEN,
ExpiresAt: timestamppb.New(time.Now().UTC().Add(5 * time.Second)),
},
)
require.NoError(t, err)

retrievedAuth, err := authstore.GetAuthenticationByClientToken(ctx, clientToken)
require.NoError(t, err)
assert.Equal(t, storedAuth, retrievedAuth)
})

t.Run("ensure grace period protects token from being deleted", func(t *testing.T) {
// token should still exist as it wont be deleted until
// expiry + grace period (5s + 5s == 10s)
time.Sleep(5 * time.Second)

retrievedAuth, err := authstore.GetAuthenticationByClientToken(ctx, clientToken)
require.NoError(t, err)
assert.Equal(t, storedAuth, retrievedAuth)

// ensure authentication is expired but still persisted
assert.True(t, retrievedAuth.ExpiresAt.AsTime().Before(time.Now().UTC()))
})

t.Run("once expiry and grace period ellapses ensure token is deleted", func(t *testing.T) {
time.Sleep(10 * time.Second)

_, err := authstore.GetAuthenticationByClientToken(ctx, clientToken)
require.Error(t, err, "resource not found")
})
}
67 changes: 57 additions & 10 deletions internal/config/authentication.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
package config

import "github.com/spf13/viper"
import (
"strings"
"time"

var _ defaulter = (*AuthenticationConfig)(nil)
"github.com/spf13/viper"
"go.flipt.io/flipt/rpc/flipt/auth"
)

var (
_ defaulter = (*AuthenticationConfig)(nil)
stringToAuthMethod = map[string]auth.Method{}
)

func init() {
for method, v := range auth.Method_value {
if auth.Method(v) == auth.Method_METHOD_NONE {
continue
}

name := strings.ToLower(strings.TrimPrefix(method, "METHOD_"))
stringToAuthMethod[name] = auth.Method(v)
}
}

// AuthenticationConfig configures Flipts authentication mechanisms
type AuthenticationConfig struct {
Expand All @@ -11,30 +31,57 @@ type AuthenticationConfig struct {
// Else, authentication is not required and Flipt's APIs are not secured.
Required bool `json:"required,omitempty" mapstructure:"required"`

Methods struct {
Token AuthenticationMethodTokenConfig `json:"token,omitempty" mapstructure:"token"`
} `json:"methods,omitempty" mapstructure:"methods"`
Methods AuthenticationMethods `json:"methods,omitempty" mapstructure:"methods"`
}

// ShouldRunCleanup returns true if the cleanup background process should be started.
// It returns true given at-least 1 method is enabled and it's associated schedule
// has been configured (non-nil).
func (c AuthenticationConfig) ShouldRunCleanup() bool {
return (c.Methods.Token.Enabled && c.Methods.Token.Cleanup != nil)
}

func (a *AuthenticationConfig) setDefaults(v *viper.Viper) []string {
func (c *AuthenticationConfig) setDefaults(v *viper.Viper) []string {
token := map[string]any{
"enabled": false,
}

if v.GetBool("authentication.methods.token.enabled") {
token["cleanup"] = map[string]any{
"interval": time.Hour,
"grace_period": 30 * time.Minute,
}
}

v.SetDefault("authentication", map[string]any{
"required": false,
"methods": map[string]any{
"token": map[string]any{
"enabled": false,
},
"token": token,
},
})

return nil
}

// AuthenticationMethods is a set of configuration for each authentication
// method available for use within Flipt.
type AuthenticationMethods struct {
Token AuthenticationMethodTokenConfig `json:"token,omitempty" mapstructure:"token"`
}

// AuthenticationMethodTokenConfig contains fields used to configure the authentication
// method "token".
// This authentication method supports the ability to create static tokens via the
// /auth/v1/method/token prefix of endpoints.
type AuthenticationMethodTokenConfig struct {
// Enabled designates whether or not static token authentication is enabled
// and whether Flipt will mount the "token" method APIs.
Enabled bool `json:"enabled,omitempty" mapstructure:"enabled"`
Enabled bool `json:"enabled,omitempty" mapstructure:"enabled"`
Cleanup *AuthenticationCleanupSchedule `json:"cleanup,omitempty" mapstructure:"cleanup"`
}

// AuthenticationCleanupSchedule is used to configure a cleanup goroutine.
type AuthenticationCleanupSchedule struct {
Interval time.Duration `json:"interval,omitempty" mapstructure:"interval"`
GracePeriod time.Duration `json:"gracePeriod,omitempty" mapstructure:"grace_period"`
}
Loading