diff --git a/pkg/lib/v0_2_0/events.go b/pkg/lib/v0_2_0/events.go index 9f879b7f..96b4f518 100644 --- a/pkg/lib/v0_2_0/events.go +++ b/pkg/lib/v0_2_0/events.go @@ -41,16 +41,27 @@ const keptnSpecVersionCEExtension = "shkeptnspecversion" const triggeredIDCEExtension = "triggeredid" const keptnGitCommitIDCEExtension = "gitcommitid" +type HTTPSenderOption func(httpSender *HTTPEventSender) + +// WithSendRetries allows to specify the number of retries that are performed if the receiver of an event returns a HTTP error code +func WithSendRetries(retries int) HTTPSenderOption { + return func(httpSender *HTTPEventSender) { + httpSender.nrRetries = retries + } +} + // HTTPEventSender sends CloudEvents via HTTP type HTTPEventSender struct { // EventsEndpoint is the http endpoint the events are sent to EventsEndpoint string // Client is an implementation of the cloudevents.Client interface Client cloudevents.Client + // nrRetries is the number of retries that are attempted if the endpoint an event is forwarded to returns an http code outside the 2xx range + nrRetries int } // NewHTTPEventSender creates a new HTTPSender -func NewHTTPEventSender(endpoint string) (*HTTPEventSender, error) { +func NewHTTPEventSender(endpoint string, opts ...HTTPSenderOption) (*HTTPEventSender, error) { if endpoint == "" { endpoint = DefaultHTTPEventEndpoint } @@ -75,6 +86,11 @@ func NewHTTPEventSender(endpoint string) (*HTTPEventSender, error) { httpSender := &HTTPEventSender{ EventsEndpoint: endpoint, Client: c, + nrRetries: MAX_SEND_RETRIES, + } + + for _, o := range opts { + o(httpSender) } return httpSender, nil } @@ -88,7 +104,7 @@ func (httpSender HTTPEventSender) Send(ctx context.Context, event cloudevents.Ev ctx = cloudevents.ContextWithTarget(ctx, httpSender.EventsEndpoint) ctx = cloudevents.WithEncodingStructured(ctx) var result protocol.Result - for i := 0; i <= MAX_SEND_RETRIES; i++ { + for i := 0; i <= httpSender.nrRetries; i++ { result = httpSender.Client.Send(ctx, event) httpResult, ok := result.(*httpprotocol.Result) switch { @@ -103,7 +119,7 @@ func (httpSender HTTPEventSender) Send(ctx context.Context, event cloudevents.Ev return nil } } - return errors.New("Failed to send cloudevent: " + result.Error()) + return fmt.Errorf("could not send cloudevent after %d retries. Received result from the receiver: %w", httpSender.nrRetries, result) } // EventSender fakes the sending of CloudEvents diff --git a/pkg/lib/v0_2_0/events_test.go b/pkg/lib/v0_2_0/events_test.go index faa46b63..21fde3f6 100644 --- a/pkg/lib/v0_2_0/events_test.go +++ b/pkg/lib/v0_2_0/events_test.go @@ -1,6 +1,7 @@ package v0_2_0 import ( + "context" "net/http" "net/http/httptest" "testing" @@ -48,6 +49,7 @@ func TestKeptn_SendCloudEventWithRetry(t *testing.T) { failOnFirstTry = false w.WriteHeader(500) w.Write([]byte(`{}`)) + return } w.WriteHeader(200) w.Write([]byte(`{}`)) @@ -55,42 +57,45 @@ func TestKeptn_SendCloudEventWithRetry(t *testing.T) { ) defer ts.Close() - type args struct { - event cloudevents.Event - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "", - fields: getKeptnFields(ts), - args: args{ - event: getTestEvent(), - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - httpSender, _ := NewHTTPEventSender(ts.URL) - k := &Keptn{ - KeptnBase: keptn.KeptnBase{ - KeptnContext: tt.fields.KeptnContext, - Event: tt.fields.KeptnBase, - EventSender: httpSender, - UseLocalFileSystem: tt.fields.useLocalFileSystem, - ResourceHandler: tt.fields.resourceHandler, - EventHandler: tt.fields.eventHandler, - }, - } - if err := k.SendCloudEvent(tt.args.event); (err != nil) != tt.wantErr { - t.Errorf("SendCloudEvent() error = %v, wantErr %v", err, tt.wantErr) + httpSender, _ := NewHTTPEventSender(ts.URL) + + err := httpSender.Send(context.TODO(), getTestEvent()) + + require.Nil(t, err) +} + +func TestKeptn_SendCloudEventWithOneRetry(t *testing.T) { + nrRequests := 0 + ts := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + if nrRequests < 2 { + nrRequests++ + w.WriteHeader(500) + w.Write([]byte(`{}`)) + return } - }) - } + w.WriteHeader(200) + w.Write([]byte(`{}`)) + }), + ) + defer ts.Close() + + httpSender, _ := NewHTTPEventSender(ts.URL, WithSendRetries(1)) + + err := httpSender.Send(context.TODO(), getTestEvent()) + + require.NotNil(t, err) + + // reset nrRequests + nrRequests = 0 + + //initialize a new sender with 2 retries + httpSender, _ = NewHTTPEventSender(ts.URL, WithSendRetries(5)) + + err = httpSender.Send(context.TODO(), getTestEvent()) + + require.Nil(t, err) } func getTestEvent() cloudevents.Event {