Skip to content

Commit

Permalink
[PE-1779] add retry in case of inserter failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Aviv Laufer committed Apr 8, 2024
1 parent baa447d commit 4ecf612
Showing 1 changed file with 68 additions and 17 deletions.
85 changes: 68 additions & 17 deletions agent/configmaps/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (

"github.com/kvtools/valkeyrie"
"github.com/kvtools/valkeyrie/store"
cv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/retry"
)

// StoreName the name of the store.
Expand Down Expand Up @@ -113,23 +116,34 @@ func (s *Store) Put(ctx context.Context, key string, value []byte, opts *store.W
strings.ReplaceAll(
strings.ReplaceAll(filepath.Dir(key), "/", "---"), "_", "--"),
)
cm, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Get(ctx, configMapName, v1.GetOptions{})
retryableFn := func(error) bool {
return true
}

cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)
cm, err := cmClient.Get(ctx, configMapName, v1.GetOptions{})
if err != nil {
cm.ObjectMeta.Name = configMapName
cm, err = s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Create(ctx, cm, v1.CreateOptions{})
if err != nil {
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
cm, err = cmClient.Create(ctx, cm, v1.CreateOptions{})
return err
})
if retryErr != nil {
return retryErr
}
}
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[filename] = string(value)
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
_, err = cmClient.Update(
ctx, cm, v1.UpdateOptions{},
)
return err
})
return retryErr

_, err = s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Update(
ctx, cm, v1.UpdateOptions{},
)
return err
}

// Get a value given its key.
Expand All @@ -141,10 +155,20 @@ func (s *Store) Get(ctx context.Context, key string, opts *store.ReadOptions) (*
strings.ReplaceAll(
strings.ReplaceAll(filepath.Dir(key), "/", "---"), "_", "--"),
)
c, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Get(ctx, configMapName, v1.GetOptions{})
logger.Debug("got configmap", slog.Any("error", err))
if err != nil {
return nil, err
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)

retryableFn := func(error) bool {
return true
}
var c *cv1.ConfigMap
var err error
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
c, err = cmClient.Get(ctx, configMapName, v1.GetOptions{})
return err
})
logger.Debug("got configmap", slog.Any("error", retryErr))
if retryErr != nil {
return nil, retryErr
}

if value, ok := c.Data[filename]; ok {
Expand All @@ -163,14 +187,29 @@ func (s Store) Delete(ctx context.Context, key string) error {
strings.ReplaceAll(
strings.ReplaceAll(filepath.Dir(key), "/", "---"), "_", "--"),
)
cm, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Get(ctx, configMapName, v1.GetOptions{})
if err != nil {
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)
retryableFn := func(error) bool {
return true
}
var cm *cv1.ConfigMap
var err error
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
cm, err = cmClient.Get(ctx, configMapName, v1.GetOptions{})
return err
})
if retryErr != nil {
return retryErr
}
delete(cm.Data, filename)
_, err = s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Update(
ctx, cm, v1.UpdateOptions{},
)
retryErr = retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
_, err = cmClient.Update(
ctx, cm, v1.UpdateOptions{},
)
return err
})
if retryErr != nil {
return retryErr
}
return err
}

Expand All @@ -184,7 +223,19 @@ func (s Store) Exists(ctx context.Context, key string, opts *store.ReadOptions)
func (s *Store) Watch(ctx context.Context, key string, opts *store.ReadOptions) (<-chan *store.KVPair, error) {
logger := s.logger.With(slog.String("key", key))
logger.Debug("watching key")
c, err := s.clientset.CoreV1().ConfigMaps(s.config.Namespace).Watch(ctx, v1.ListOptions{FieldSelector: ""})
cmClient := s.clientset.CoreV1().ConfigMaps(s.config.Namespace)
retryableFn := func(error) bool {
return true
}
var c watch.Interface
var err error
retryErr := retry.OnError(retry.DefaultBackoff, retryableFn, func() error {
c, err = cmClient.Watch(ctx, v1.ListOptions{FieldSelector: ""})
return err
})
if retryErr != nil {
return nil, retryErr
}
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4ecf612

Please sign in to comment.