Skip to content

Commit

Permalink
fixed tests and removed event nil case when binding.ToEvent fails cau…
Browse files Browse the repository at this point in the history
…se its always going to be nil
  • Loading branch information
gabo1208 authored and Gabriel Freites committed Jul 27, 2023
1 parent 3589e6f commit fcfc5ad
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 33 deletions.
15 changes: 2 additions & 13 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c
msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg)
event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding)
if err != nil {
if event == nil {
logging.FromContext(ctx).Error("failed parsing event: ", err)
} else {
logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err)

// Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery
event.SetExtension("knativeerrordest", d.SubscriberURL)
if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil {
logging.FromContext(ctx).Warn("failed to send event: ", err)
}
}

logging.FromContext(ctx).Warn("failed parsing event: ", err)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
Expand Down Expand Up @@ -296,7 +285,7 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien
}

if !isSuccess {
logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL)
logging.FromContext(ctx).Warnf("failed to deliver to %q %s", d.SubscriberURL, msg)
if err = msg.Ack(false); err != nil {
logging.FromContext(ctx).Warn("failed to Ack event: ", err)
}
Expand Down
23 changes: 3 additions & 20 deletions pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package dispatcher

import (
"context"
v2 "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"io"
"log"
"net/http"
Expand All @@ -28,6 +26,9 @@ import (
"testing"
"time"

v2 "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"

"github.com/cloudevents/sdk-go/v2/protocol"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
amqp "github.com/rabbitmq/amqp091-go"
Expand Down Expand Up @@ -250,7 +251,6 @@ func TestDispatcher_dispatch(t *testing.T) {
WorkerCount int
Reporter dispatcherstats.StatsReporter
DLX bool
DLXName string
}
type args struct {
ctx context.Context
Expand All @@ -264,20 +264,6 @@ func TestDispatcher_dispatch(t *testing.T) {
args args
wantErr bool
}{
{
name: "knativeerrordest is in header",
fields: fields{},
args: args{
ctx: context.TODO(),
msg: amqp.Delivery{
Acknowledger: &MockAcknowledger{},
ContentType: "application/cloudevents+json",
Headers: amqp.Table{"knativeerrordest": "some-destination"},
},
client: MockClient{},
channel: &channel,
},
},
{
name: "invalid event",
fields: fields{},
Expand All @@ -297,7 +283,6 @@ func TestDispatcher_dispatch(t *testing.T) {
name: "invalid request",
fields: fields{
Reporter: &MockStatsReporter{},
DLXName: "test",
},
args: args{
ctx: context.TODO(),
Expand All @@ -320,7 +305,6 @@ func TestDispatcher_dispatch(t *testing.T) {
name: "valid event",
fields: fields{
Reporter: &MockStatsReporter{},
DLXName: "test",
},
args: args{
ctx: context.TODO(),
Expand Down Expand Up @@ -352,7 +336,6 @@ func TestDispatcher_dispatch(t *testing.T) {
WorkerCount: tt.fields.WorkerCount,
Reporter: tt.fields.Reporter,
DLX: tt.fields.DLX,
DLXName: tt.fields.DLXName,
}

client, err := v2.NewClient(tt.args.client)
Expand Down

0 comments on commit fcfc5ad

Please sign in to comment.