Skip to content

Commit

Permalink
[PE-1779] add retry in case of inserter failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Aviv Laufer committed Apr 9, 2024
1 parent baa447d commit 423b399
Showing 1 changed file with 72 additions and 17 deletions.
89 changes: 72 additions & 17 deletions agent/configmaps/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package configmaps

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
Expand All @@ -14,10 +15,13 @@ import (

"github.com/kvtools/valkeyrie"
"github.com/kvtools/valkeyrie/store"
cv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)

// StoreName the name of the store.
Expand All @@ -36,6 +40,11 @@ type Config struct {
Create bool
}

func retryableFn(err error) bool {
slog.Default().Debug("Error from configmap API -retrying", slog.Any("error", err))
return !errors.Is(err, context.Canceled)

}
func newStore(ctx context.Context, endpoints []string, options valkeyrie.Config) (store.Store, error) {
cfg, ok := options.(*Config)
if !ok && options != nil {
Expand Down Expand Up @@ -108,28 +117,41 @@ func defaultNamespace() string {

// Put a value at the specified key.
func (s *Store) Put(ctx context.Context, key string, value []byte, opts *store.WriteOptions) error {
logger := s.logger.With(slog.String("key", key))
filename := filepath.Base(key)
configMapName := strings.ToLower(
strings.ReplaceAll(
strings.ReplaceAll(filepath.Dir(key), "/", "---"), "_", "--"),
)
cm, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Get(ctx, configMapName, v1.GetOptions{})
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)
cm, err := cmClient.Get(ctx, configMapName, v1.GetOptions{})
if err != nil {
cm.ObjectMeta.Name = configMapName
cm, err = s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Create(ctx, cm, v1.CreateOptions{})
if err != nil {
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
cm, err = cmClient.Create(ctx, cm, v1.CreateOptions{})
return err
})
if retryErr != nil {
logger.Error("Error creating configmap - after retries", slog.Any("error", retryErr))
return retryErr
}
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[filename] = string(value)
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
_, err = cmClient.Update(
ctx, cm, v1.UpdateOptions{},
)
return err
})
if retryErr != nil {
logger.Error("Error updating configmap - after retries", slog.Any("error", retryErr))

}
return retryErr

_, err = s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Update(
ctx, cm, v1.UpdateOptions{},
)
return err
}

// Get a value given its key.
Expand All @@ -141,10 +163,18 @@ func (s *Store) Get(ctx context.Context, key string, opts *store.ReadOptions) (*
strings.ReplaceAll(
strings.ReplaceAll(filepath.Dir(key), "/", "---"), "_", "--"),
)
c, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Get(ctx, configMapName, v1.GetOptions{})
logger.Debug("got configmap", slog.Any("error", err))
if err != nil {
return nil, err
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)

var c *cv1.ConfigMap
var err error
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
c, err = cmClient.Get(ctx, configMapName, v1.GetOptions{})
return err
})
logger.Debug("got configmap", slog.Any("error", retryErr))
if retryErr != nil {
logger.Error("Error getting configmap - after retries", slog.Any("error", retryErr))
return nil, retryErr
}

if value, ok := c.Data[filename]; ok {
Expand All @@ -158,19 +188,34 @@ func (s *Store) Get(ctx context.Context, key string, opts *store.ReadOptions) (*

// Delete the value at the specified key.
func (s Store) Delete(ctx context.Context, key string) error {
logger := s.logger.With(slog.String("key", key))
filename := filepath.Base(key)
configMapName := strings.ToLower(
strings.ReplaceAll(
strings.ReplaceAll(filepath.Dir(key), "/", "---"), "_", "--"),
)
cm, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Get(ctx, configMapName, v1.GetOptions{})
if err != nil {
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)
var cm *cv1.ConfigMap
var err error
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
cm, err = cmClient.Get(ctx, configMapName, v1.GetOptions{})
return err
})
if retryErr != nil {
logger.Error("Error getting configmap - after retries", slog.Any("error", retryErr))
return retryErr
}
delete(cm.Data, filename)
_, err = s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Update(
ctx, cm, v1.UpdateOptions{},
)
retryErr = retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
_, err = cmClient.Update(
ctx, cm, v1.UpdateOptions{},
)
return err
})
if retryErr != nil {
logger.Error("Error updating configmap - after retries", slog.Any("error", retryErr))
return retryErr
}
return err
}

Expand All @@ -184,7 +229,17 @@ func (s Store) Exists(ctx context.Context, key string, opts *store.ReadOptions)
func (s *Store) Watch(ctx context.Context, key string, opts *store.ReadOptions) (<-chan *store.KVPair, error) {
logger := s.logger.With(slog.String("key", key))
logger.Debug("watching key")
c, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Watch(ctx, v1.ListOptions{FieldSelector: ""})
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)
var c watch.Interface
var err error
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
c, err = cmClient.Watch(ctx, v1.ListOptions{FieldSelector: ""})
return err
})
if retryErr != nil {
logger.Error("Error Watching configmap - after retries", slog.Any("error", retryErr))
return nil, retryErr
}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 423b399

Please sign in to comment.