Skip to content

Commit

Permalink
use local cache
Browse files Browse the repository at this point in the history
  • Loading branch information
jr0d committed Apr 5, 2021
1 parent 056cb65 commit 3f26f0f
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 71 deletions.
14 changes: 11 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/mesosphere/traefik-forward-auth/internal/storage"
"github.com/mesosphere/traefik-forward-auth/internal/storage/cluster"
"net/http"
"os"
"time"

"github.com/gorilla/sessions"
Expand All @@ -17,7 +18,7 @@ import (
// Main
func main() {
// Parse options
config := internal.NewGlobalConfig()
config := internal.NewGlobalConfig(os.Args[1:])

// Setup logger
log := logger.NewDefaultLogger(config.LogLevel, config.LogFormat)
Expand All @@ -29,7 +30,7 @@ func main() {
config.SetOidcProvider()

// Get clientset for Authorizers
var clientset *k8s.Clientset
var clientset k8s.Interface
if config.EnableRBAC || config.EnableInClusterStorage {
var err error
clientset, err = kubernetes.GetClientSet()
Expand All @@ -53,8 +54,15 @@ func main() {
SessionName: config.ClaimsSessionName,
}
} else {
clusterStorage := cluster.NewClusterStore(clientset, config.ClusterStoreNamespace, config.Lifetime)
clusterStorage := cluster.NewClusterStore(
clientset,
config.ClusterStoreNamespace,
string(config.Secret),
config.Lifetime,
time.Duration(config.ClusterStoreCacheTTL)*time.Second)

gc := cluster.NewGC(clusterStorage, time.Minute, false, true)

if err := gc.Start(); err != nil {
log.Fatalf("error starting GC process: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/api/storage/v1alpha1/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package v1alpha1

type UserInfo struct {
Username string
Email string
Groups []string
Username string `json:"username"`
Email string `json:"email"`
Groups []string `json:"groups"`
}
9 changes: 5 additions & 4 deletions internal/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,13 @@ func matchCookieDomains(domain string) (bool, string) {
// Remove port
p := strings.Split(domain, ":")

for _, d := range config.CookieDomains {
if d.Match(p[0]) {
return true, d.Domain
if config != nil {
for _, d := range config.CookieDomains {
if d.Match(p[0]) {
return true, d.Domain
}
}
}

return false, p[0]
}

Expand Down
6 changes: 4 additions & 2 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Config struct {
SessionKey string `long:"session-key" env:"SESSION_KEY" description:"A session key used to encrypt browser sessions"`
GroupsAttributeName string `long:"groups-attribute-name" env:"GROUPS_ATTRIBUTE_NAME" default:"groups" description:"Map the correct attribute that contain the user groups"`
ClusterStoreNamespace string `long:"cluster-store-namespace" env:"CLUSTER_STORE_NAMESPACE" default:"default" description:"Namespace to store userinfo secrets"`
ClusterStoreCacheTTL int `long:"cluster-store-cache-ttl" env:"CLUSTER_STORE_CACHE_TTL" default:"60" description:"TTL (in seconds) of the internal secret cache"`

// RBAC
EnableRBAC bool `long:"enable-rbac" env:"ENABLE_RBAC" description:"Indicates that RBAC support should be enabled"`
AuthZPassThrough CommaSeparatedList `long:"authz-pass-through" env:"AUTHZ_PASS_THROUGH" description:"One or more routes which bypass authorization checks"`
Expand All @@ -72,9 +74,9 @@ type Config struct {
ServiceAccountToken string
}

func NewGlobalConfig() *Config {
func NewGlobalConfig(args []string) *Config {
var err error
config, err = NewConfig(os.Args[1:])
config, err = NewConfig(args)
if err != nil {
fmt.Printf("%+v\n", err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion internal/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (

// GetClientSet will attempt to get an external cluster configuration if the KUBECONFIG environment
// variable is set. Otherwise will attempt to get an in-cluster configuration.
func GetClientSet() (*k8s.Clientset, error) {
func GetClientSet() (k8s.Interface, error) {
configPath := os.Getenv(KubeConfigEnv)
var config *rest.Config
var err error
Expand Down
64 changes: 64 additions & 0 deletions internal/storage/cluster/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cluster

import (
"github.com/mesosphere/traefik-forward-auth/internal/api/storage/v1alpha1"
"sync"
"time"
)

type UserInfoRecord struct {
created time.Time
userInfo *v1alpha1.UserInfo
}

// UserInfoCache is a simple hit or miss cache which is used to reduce calls to
// the apiserver. The client-go cache module and higher level informer caches rely on apiserver
// events to synchronize local caches. Since the clusterstore will update secrets during auth callback,
// we cannot be certain that these updates would have occurred on subsequent requests without
// triggering and waiting on a full resync. Unfortunately, Resync is not exposed on Informer objects,
// thus we need to query the actual apiserver state, instead of relying on the local caches.
type UserInfoCache struct {
TTL time.Duration

infos map[string]UserInfoRecord
lock sync.Mutex
}

func NewUserInfoCache(ttl time.Duration) *UserInfoCache {
infos := make(map[string]UserInfoRecord)
return &UserInfoCache{
TTL: ttl,
infos: infos,
lock: sync.Mutex{},
}
}

func (uc *UserInfoCache) Get(claimsId string) *v1alpha1.UserInfo {
record, ok := uc.infos[claimsId]
if !ok {
return nil
}

now := time.Now()
if now.Sub(record.created.Add(uc.TTL)) >= 0 {
// expired delete record
uc.Delete(claimsId)
return nil
}
return record.userInfo
}

func (uc *UserInfoCache) Save(claimsId string, info *v1alpha1.UserInfo) {
record := UserInfoRecord{
created: time.Now(),
userInfo: info,
}
uc.infos[claimsId] = record
return
}

func (uc *UserInfoCache) Delete(claimsId string) {
uc.lock.Lock()
defer uc.lock.Unlock()
delete(uc.infos, claimsId)
}
128 changes: 70 additions & 58 deletions internal/storage/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,19 @@ import (
"encoding/json"
"errors"
"fmt"
tfa "github.com/mesosphere/traefik-forward-auth/internal"
"github.com/mesosphere/traefik-forward-auth/internal/api/storage/v1alpha1"
"github.com/mesosphere/traefik-forward-auth/internal/storage"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
listers "k8s.io/client-go/listers/core/v1"
"math/rand"
"net/http"
"strings"
"time"
)

const (
informerSyncDuration = time.Minute * 10
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

tfa "github.com/mesosphere/traefik-forward-auth/internal"
"github.com/mesosphere/traefik-forward-auth/internal/api/storage/v1alpha1"
"github.com/mesosphere/traefik-forward-auth/internal/storage"
)

var (
Expand All @@ -43,29 +37,25 @@ func SecretError(msg string) error {
var logger = logrus.New()

type ClusterStorage struct {
Client *kubernetes.Clientset
Client kubernetes.Interface
Namespace string
HmacSecret []byte
Lifetime time.Duration
GCInterval time.Duration

secrets v1.SecretInterface
secretLister listers.SecretLister
sharedInformerFactory informers.SharedInformerFactory
informerStop chan struct{}
selector labels.Selector
ticker time.Ticker
ticker time.Ticker

cache *UserInfoCache
}

func NewClusterStore(client *kubernetes.Clientset, namespace string, expiry time.Duration) *ClusterStorage {
func NewClusterStore(client kubernetes.Interface, namespace, hmacSecret string, expiry, cacheTTL time.Duration) *ClusterStorage {
cs := &ClusterStorage{
Client: client,
Namespace: namespace,
secrets: client.CoreV1().Secrets(namespace),
Lifetime: expiry,
selector: labels.Set(map[string]string{storage.ClaimsLabel: "true"}).AsSelector(),
Client: client,
Namespace: namespace,
Lifetime: expiry,
HmacSecret: []byte(hmacSecret),
cache: NewUserInfoCache(cacheTTL),
}
cs.prepareCache()
return cs
}

Expand All @@ -75,18 +65,7 @@ func (cs *ClusterStorage) Get(r *http.Request) (*v1alpha1.UserInfo, error) {
return nil, err
}

var userinfo *v1alpha1.UserInfo
secret, err := cs.getSecretByClaim(claimsId)
if err != nil {
return nil, err
}

userinfo, err = cs.getUserInfoFromSecret(secret)
if err != nil {
return nil, err
}

return userinfo, nil
return cs.cacheGet(claimsId)
}

func (cs *ClusterStorage) Save(r *http.Request, w http.ResponseWriter, info *v1alpha1.UserInfo) error {
Expand All @@ -102,17 +81,11 @@ func (cs *ClusterStorage) Clear(r *http.Request, w http.ResponseWriter) error {
// malformed, do nothing
return nil
}
cs.cache.Delete(claimsId)
cs.clearClaimsIDCookie(r, w)
return cs.deleteClaimsSecret(claimsId)
}

func (cs *ClusterStorage) prepareCache() {
cs.sharedInformerFactory = informers.NewSharedInformerFactory(cs.Client, informerSyncDuration)
cs.secretLister = cs.sharedInformerFactory.Core().V1().Secrets().Lister()
cs.sharedInformerFactory.Start(cs.informerStop)
cs.sharedInformerFactory.WaitForCacheSync(cs.informerStop)
}

func (cs *ClusterStorage) createClaimsIDCookie(claimsId string, r *http.Request, w http.ResponseWriter) {
cookieData := fmt.Sprintf("%s:%s", claimsId, cs.generateHMAC(claimsId))
cookie := &http.Cookie{
Expand Down Expand Up @@ -180,7 +153,7 @@ func (cs *ClusterStorage) getUserInfoFromSecret(s *corev1.Secret) (*v1alpha1.Use
return nil, SecretError("userinfo data missing from secret")
}

var userinfo *v1alpha1.UserInfo
userinfo := &v1alpha1.UserInfo{}
if err := json.Unmarshal(data, userinfo); err != nil {
return nil, fmt.Errorf("%v: %w", SecretError("error parsing userinfo"), err)
}
Expand Down Expand Up @@ -212,20 +185,26 @@ func (cs *ClusterStorage) storeUserInfo(claimId string, info *v1alpha1.UserInfo)
return nil
}

func (cs *ClusterStorage) getSecrets() (*corev1.SecretList, error) {
return cs.Client.CoreV1().Secrets(cs.Namespace).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=true", storage.ClaimsLabel),
})
}

func (cs *ClusterStorage) getSecretByClaim(claimsId string) (*corev1.Secret, error) {
secrets, err := cs.secretLister.Secrets(cs.Namespace).List(cs.selector)
secrets, err := cs.getSecrets()
if err != nil {
return nil, fmt.Errorf("error getting secret list: %w", err)
}

for _, s := range secrets {
for _, s := range secrets.Items {
cid, ok := s.ObjectMeta.Labels[storage.ClaimsIDLabel]
if !ok {
logger.Errorf("found managed secret not containing claimID")
continue
}
if claimsId == cid {
return s, nil
return &s, nil
}
}
return nil, SecretError("not found")
Expand All @@ -245,18 +224,23 @@ func (cs *ClusterStorage) deleteClaimsSecret(claimsId string) error {
}

func (cs *ClusterStorage) deleteExpiredSecrets() error {
secrets, err := cs.secretLister.Secrets(cs.Namespace).List(cs.selector)
secrets, err := cs.getSecrets()
if err != nil {
return fmt.Errorf("error getting secret list: %w", err)
}

for _, secret := range secrets {
if secret.DeletionTimestamp != nil {
logger.Infof("secret %s already scheduled for deletion", secret.Name)
continue
}
for _, secret := range secrets.Items {
now := time.Now().UTC()
if now.Sub(secret.CreationTimestamp.UTC().Add(cs.Lifetime)) <= 0 {
if now.Sub(secret.CreationTimestamp.UTC().Add(cs.Lifetime)) >= 0 {
claimId, ok := secret.Labels[storage.ClaimsIDLabel]
if ok {
cs.cache.Delete(claimId)
}
if secret.DeletionTimestamp != nil {
logger.Infof("secret %s already scheduled for deletion", secret.Name)
continue
}

if err := cs.Client.CoreV1().Secrets(cs.Namespace).Delete(
secret.Name, &metav1.DeleteOptions{}); err != nil {
logger.Errorf("error deleting expired secret %s/%s: %s", cs.Namespace, secret.Name, err)
Expand All @@ -265,3 +249,31 @@ func (cs *ClusterStorage) deleteExpiredSecrets() error {
}
return nil
}

func (cs *ClusterStorage) cacheGet(claimsId string) (*v1alpha1.UserInfo, error) {
userInfo := cs.cache.Get(claimsId)
if userInfo != nil {
return userInfo, nil
}

secret, err := cs.getSecretByClaim(claimsId)
if err != nil {
return nil, err
}

userInfo, err = cs.getUserInfoFromSecret(secret)
if err != nil {
return nil, err
}

cs.cache.Save(claimsId, userInfo)
return userInfo, nil
}

func (cs *ClusterStorage) cacheSave(claimsId string, userInfo *v1alpha1.UserInfo) error {
if err := cs.storeUserInfo(claimsId, userInfo); err != nil {
return err
}
cs.cache.Save(claimsId, userInfo)
return nil
}
Loading

0 comments on commit 3f26f0f

Please sign in to comment.