Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding retry loop to update #1441

Merged
merged 13 commits into from
Sep 13, 2021
31 changes: 31 additions & 0 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"time"

"k8s.io/client-go/util/retry"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -33,6 +35,8 @@ import (
"knative.dev/client/pkg/wait"
)

type TriggerUpdateFunc func(origSource *eventingv1.Trigger) (*eventingv1.Trigger, error)

// KnEventingClient to Eventing Sources. All methods are relative to the
// namespace specified during construction
type KnEventingClient interface {
Expand All @@ -48,6 +52,8 @@ type KnEventingClient interface {
ListTriggers(ctx context.Context) (*eventingv1.TriggerList, error)
// UpdateTrigger is used to update an instance of trigger
UpdateTrigger(ctx context.Context, trigger *eventingv1.Trigger) error
// UpdateTriggerWithRetry is used to update an instance of trigger
UpdateTriggerWithRetry(ctx context.Context, name string, updateFunc TriggerUpdateFunc, nrRetries int) error
// CreateBroker is used to create an instance of broker
CreateBroker(ctx context.Context, broker *eventingv1.Broker) error
// GetBroker is used to get an instance of broker
Expand Down Expand Up @@ -137,6 +143,31 @@ func (c *knEventingClient) UpdateTrigger(ctx context.Context, trigger *eventingv
return nil
}

func (c *knEventingClient) UpdateTriggerWithRetry(ctx context.Context, name string, updateFunc TriggerUpdateFunc, nrRetries int) error {
return updateTriggerWithRetry(ctx, c, name, updateFunc, nrRetries)
}

func updateTriggerWithRetry(ctx context.Context, c KnEventingClient, name string, updateFunc TriggerUpdateFunc, nrRetries int) error {
b := retry.DefaultRetry
b.Steps = nrRetries
err := retry.RetryOnConflict(b, func() error {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
source, err := c.GetTrigger(ctx, name)
if err != nil {
return err
}
if source.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update trigger %s because it has been marked for deletion", name)
}
updatedSource, err := updateFunc(source.DeepCopy())
if err != nil {
return err
}

return c.UpdateTrigger(ctx, updatedSource)
})
return err
}

// Return the client's namespace
func (c *knEventingClient) Namespace() string {
return c.namespace
Expand Down
4 changes: 4 additions & 0 deletions pkg/eventing/v1/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (c *MockKnEventingClient) UpdateTrigger(ctx context.Context, trigger *event
return mock.ErrorOrNil(call.Result[0])
}

func (c *MockKnEventingClient) UpdateTriggerWithRetry(ctx context.Context, name string, updateFunc TriggerUpdateFunc, nrRetries int) error {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
return updateTriggerWithRetry(ctx, c, name, updateFunc, nrRetries)
}

// CreateBroker records a call for CreateBroker with the expected error
func (sr *EventingRecorder) CreateBroker(broker interface{}, err error) {
sr.r.Add("CreateBroker", []interface{}{broker}, []interface{}{err})
Expand Down
68 changes: 34 additions & 34 deletions pkg/kn/commands/source/ping/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ import (
"fmt"

"github.com/spf13/cobra"
"k8s.io/client-go/util/retry"
"knative.dev/client/pkg/kn/commands/service"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this dependency ? Normally commands should stay independent and don't refer to each other to avoid cyclic dependency and allow them easily to be refactored/moved.

If there is some code that can be reused, I'd propose to move it to a shared package and use it from there in service and here.

Copy link
Contributor Author

@vyasgun vyasgun Sep 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because I imported the MaxUpdateRetries from service package. I will change this and create a variable in the same package.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks !


"knative.dev/client/pkg/kn/commands"
"knative.dev/client/pkg/kn/commands/flags"
sourcesv1beta2 "knative.dev/client/pkg/sources/v1beta2"
eventingsourcesv1beta2 "knative.dev/eventing/pkg/apis/sources/v1beta2"

"knative.dev/client/pkg/util"
)

Expand Down Expand Up @@ -58,46 +62,42 @@ func NewPingUpdateCommand(p *commands.KnParams) *cobra.Command {
return err
}

source, err := pingSourceClient.GetPingSource(cmd.Context(), name)
if err != nil {
return err
}
if source.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update ping source %s because it has been marked for deletion", name)
}

b := sourcesv1beta2.NewPingSourceBuilderFromExisting(source)
if cmd.Flags().Changed("schedule") {
b.Schedule(updateFlags.schedule)
}

data, dataBase64, err := getDataFields(&updateFlags)
if err != nil {
return fmt.Errorf("cannot update PingSource %q in namespace "+
"%q because: %s", name, namespace, err)
}

if cmd.Flags().Changed("data") {
b.Data(data).DataBase64(dataBase64)
}
if cmd.Flags().Changed("sink") {
destination, err := sinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return err
updateFunc := func(origSource *eventingsourcesv1beta2.PingSource) (*eventingsourcesv1beta2.PingSource, error) {
b := sourcesv1beta2.NewPingSourceBuilderFromExisting(origSource)
if cmd.Flags().Changed("schedule") {
b.Schedule(updateFlags.schedule)
}
b.Sink(*destination)
}

if cmd.Flags().Changed("ce-override") {
ceOverridesMap, err := util.MapFromArrayAllowingSingles(updateFlags.ceOverrides, "=")
data, dataBase64, err := getDataFields(&updateFlags)
if err != nil {
return err
return nil, fmt.Errorf("cannot update PingSource %q in namespace "+
"%q because: %s", name, namespace, err)
}
if cmd.Flags().Changed("data") {
b.Data(data).DataBase64(dataBase64)
}
if cmd.Flags().Changed("sink") {
destination, err := sinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return nil, err
}
b.Sink(*destination)
}
if cmd.Flags().Changed("ce-override") {
ceOverridesMap, err := util.MapFromArrayAllowingSingles(updateFlags.ceOverrides, "=")
if err != nil {
return nil, err
}
ceOverridesToRemove := util.ParseMinusSuffix(ceOverridesMap)
b.CloudEventOverrides(ceOverridesMap, ceOverridesToRemove)
}
ceOverridesToRemove := util.ParseMinusSuffix(ceOverridesMap)
b.CloudEventOverrides(ceOverridesMap, ceOverridesToRemove)
updatedSource := b.Build()
return updatedSource, nil
}
backoff := retry.DefaultRetry
backoff.Steps = service.MaxUpdateRetries
err = pingSourceClient.UpdatePingSourceWithRetry(cmd.Context(), name, updateFunc, service.MaxUpdateRetries)

err = pingSourceClient.UpdatePingSource(cmd.Context(), b.Build())
if err == nil {
fmt.Fprintf(cmd.OutOrStdout(), "Ping source '%s' updated in namespace '%s'.\n", name, pingSourceClient.Namespace())
}
Expand Down
38 changes: 12 additions & 26 deletions pkg/kn/commands/trigger/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ import (
"errors"
"fmt"

"github.com/spf13/cobra"
apierrors "k8s.io/apimachinery/pkg/api/errors"
clientv1beta1 "knative.dev/client/pkg/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

"github.com/spf13/cobra"
v1beta1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

clientv1beta1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/kn/commands"
"knative.dev/client/pkg/kn/commands/flags"
"knative.dev/client/pkg/util"
Expand Down Expand Up @@ -69,26 +68,17 @@ func NewTriggerUpdateCommand(p *commands.KnParams) *cobra.Command {
return err
}

var retries = 0
for {
trigger, err := eventingClient.GetTrigger(cmd.Context(), name)
if err != nil {
return err
}
if trigger.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update trigger %s because it has been marked for deletion", name)
}

updateFunc := func(trigger *v1beta1.Trigger) (*v1beta1.Trigger, error) {
b := clientv1beta1.NewTriggerBuilderFromExisting(trigger)

if cmd.Flags().Changed("broker") {
return fmt.Errorf(
return nil, fmt.Errorf(
"cannot update trigger '%s' because broker is immutable", name)
}
if cmd.Flags().Changed("filter") {
updated, removed, err := triggerUpdateFlags.GetUpdateFilters()
if err != nil {
return fmt.Errorf(
return nil, fmt.Errorf(
"cannot update trigger '%s' because %w", name, err)
}
existing := extractFilters(trigger)
Expand All @@ -97,24 +87,20 @@ func NewTriggerUpdateCommand(p *commands.KnParams) *cobra.Command {
if cmd.Flags().Changed("sink") {
destination, err := sinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return err
return nil, err
}
b.Subscriber(&duckv1.Destination{
Ref: destination.Ref,
URI: destination.URI,
})
}
err = eventingClient.UpdateTrigger(cmd.Context(), b.Build())
if err != nil {
if apierrors.IsConflict(err) && retries < MaxUpdateRetries {
retries++
continue
}
return err
}
return b.Build(), nil
}
err = eventingClient.UpdateTriggerWithRetry(cmd.Context(), name, updateFunc, MaxUpdateRetries)
if err == nil {
fmt.Fprintf(cmd.OutOrStdout(), "Trigger '%s' updated in namespace '%s'.\n", name, namespace)
return nil
}
return err
},
}

Expand Down
32 changes: 14 additions & 18 deletions pkg/serving/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"time"

"k8s.io/client-go/util/retry"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -255,33 +257,27 @@ func (cl *knServingClient) UpdateServiceWithRetry(ctx context.Context, name stri

// Extracted to be usable with the Mocking client
func updateServiceWithRetry(ctx context.Context, cl KnServingClient, name string, updateFunc ServiceUpdateFunc, nrRetries int) (bool, error) {
var retries = 0
for {
var changed bool
var err error
b := retry.DefaultRetry
b.Steps = nrRetries
err = retry.RetryOnConflict(b, func() error {
service, err := cl.GetService(ctx, name)
if err != nil {
return false, err
return err
}
if service.GetDeletionTimestamp() != nil {
return false, fmt.Errorf("can't update service %s because it has been marked for deletion", name)
return fmt.Errorf("can't update service %s because it has been marked for deletion", name)
}
updatedService, err := updateFunc(service.DeepCopy())
if err != nil {
return false, err
return err
}

changed, err := cl.UpdateService(ctx, updatedService)
if err != nil {
// Retry to update when a resource version conflict exists
if apierrors.IsConflict(err) && retries < nrRetries {
retries++
// Wait a second before doing the retry
time.Sleep(time.Second)
continue
}
return false, fmt.Errorf("giving up after %d retries: %w", nrRetries, err)
}
return changed, nil
}
changed, err = cl.UpdateService(ctx, updatedService)
return err
})
return changed, err
}

// ApplyService applies a service definition that contains the service's targer state
Expand Down
32 changes: 32 additions & 0 deletions pkg/sources/v1beta2/ping_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"k8s.io/client-go/util/retry"

"k8s.io/apimachinery/pkg/runtime"
"knative.dev/client/pkg/util"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme"
Expand All @@ -31,6 +33,8 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"
)

type PingSourceUpdateFunc func(origSource *sourcesv1beta2.PingSource) (*sourcesv1beta2.PingSource, error)

// Interface for interacting with a Ping source
type KnPingSourcesClient interface {

Expand All @@ -43,6 +47,9 @@ type KnPingSourcesClient interface {
// UpdatePingSource updates a Ping source
UpdatePingSource(ctx context.Context, pingSource *sourcesv1beta2.PingSource) error

// UpdatePingSourceWithRetry updates a Ping source and retries on conflict
UpdatePingSourceWithRetry(ctx context.Context, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error

// DeletePingSource deletes a Ping source
DeletePingSource(ctx context.Context, name string) error

Expand Down Expand Up @@ -94,6 +101,31 @@ func (c *pingSourcesClient) UpdatePingSource(ctx context.Context, pingSource *so
return nil
}

func (c *pingSourcesClient) UpdatePingSourceWithRetry(ctx context.Context, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error {
return updatePingSourceWithRetry(ctx, c, name, updateFunc, nrRetries)
}

func updatePingSourceWithRetry(ctx context.Context, c KnPingSourcesClient, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error {
b := retry.DefaultRetry
b.Steps = nrRetries
err := retry.RetryOnConflict(b, func() error {
rhuss marked this conversation as resolved.
Show resolved Hide resolved
source, err := c.GetPingSource(ctx, name)
if err != nil {
return err
}
if source.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update ping source %s because it has been marked for deletion", name)
}
updatedSource, err := updateFunc(source.DeepCopy())
if err != nil {
return err
}

return c.UpdatePingSource(ctx, updatedSource)
})
return err
}

func (c *pingSourcesClient) DeletePingSource(ctx context.Context, name string) error {
err := c.client.Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sources/v1beta2/ping_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (c *MockKnPingSourceClient) UpdatePingSource(ctx context.Context, pingSourc
return mock.ErrorOrNil(call.Result[0])
}

func (c *MockKnPingSourceClient) UpdatePingSourceWithRetry(ctx context.Context, name string, updateFunc PingSourceUpdateFunc, nrRetries int) error {
return updatePingSourceWithRetry(ctx, c, name, updateFunc, nrRetries)
}

// UpdatePingSource records a call for DeletePingSource with the expected error (nil if none)
func (sr *PingSourcesRecorder) DeletePingSource(name interface{}, err error) {
sr.r.Add("DeletePingSource", []interface{}{name}, []interface{}{err})
Expand Down
Loading