diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 4adc3b1e2e..8407d44824 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -167,18 +167,7 @@ func (d *Dispatcher) dispatch(ctx context.Context, msg amqp.Delivery, ceClient c msgBinding := rabbit.NewMessageFromDelivery(ComponentName, "", "", &msg) event, err := binding.ToEvent(cloudevents.WithEncodingBinary(ctx), msgBinding) if err != nil { - if event == nil { - logging.FromContext(ctx).Error("failed parsing event: ", err) - } else { - logging.FromContext(ctx).Warn("failed parsing event, setting knativeerrordest header and NACK-ing and re-queueing: ", err) - - // Add headers as described here: https://knative.dev/docs/eventing/event-delivery/#configuring-channel-event-delivery - event.SetExtension("knativeerrordest", d.SubscriberURL) - if err = sendToRabbitMQ(channel, dlqExchange, span, event); err != nil { - logging.FromContext(ctx).Warn("failed to send event: ", err) - } - } - + logging.FromContext(ctx).Warn("failed parsing event: ", err) if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } @@ -296,7 +285,7 @@ func (d *Dispatcher) dispatchDLQ(ctx context.Context, msg amqp.Delivery, ceClien } if !isSuccess { - logging.FromContext(ctx).Warnf("failed to deliver to %q", d.SubscriberURL) + logging.FromContext(ctx).Warnf("failed to deliver to %q %s", d.SubscriberURL, msg) if err = msg.Ack(false); err != nil { logging.FromContext(ctx).Warn("failed to Ack event: ", err) } diff --git a/pkg/dispatcher/dispatcher_test.go b/pkg/dispatcher/dispatcher_test.go index e1aabb6d10..d53c859812 100644 --- a/pkg/dispatcher/dispatcher_test.go +++ b/pkg/dispatcher/dispatcher_test.go @@ -18,8 +18,6 @@ package dispatcher import ( "context" - v2 "github.com/cloudevents/sdk-go/v2" - "github.com/cloudevents/sdk-go/v2/binding" "io" "log" "net/http" @@ -28,6 +26,9 @@ import ( "testing" "time" + v2 "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/cloudevents/sdk-go/v2/protocol" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" amqp "github.com/rabbitmq/amqp091-go" @@ -250,7 +251,6 @@ func TestDispatcher_dispatch(t *testing.T) { WorkerCount int Reporter dispatcherstats.StatsReporter DLX bool - DLXName string } type args struct { ctx context.Context @@ -264,20 +264,6 @@ func TestDispatcher_dispatch(t *testing.T) { args args wantErr bool }{ - { - name: "knativeerrordest is in header", - fields: fields{}, - args: args{ - ctx: context.TODO(), - msg: amqp.Delivery{ - Acknowledger: &MockAcknowledger{}, - ContentType: "application/cloudevents+json", - Headers: amqp.Table{"knativeerrordest": "some-destination"}, - }, - client: MockClient{}, - channel: &channel, - }, - }, { name: "invalid event", fields: fields{}, @@ -297,7 +283,6 @@ func TestDispatcher_dispatch(t *testing.T) { name: "invalid request", fields: fields{ Reporter: &MockStatsReporter{}, - DLXName: "test", }, args: args{ ctx: context.TODO(), @@ -320,7 +305,6 @@ func TestDispatcher_dispatch(t *testing.T) { name: "valid event", fields: fields{ Reporter: &MockStatsReporter{}, - DLXName: "test", }, args: args{ ctx: context.TODO(), @@ -352,7 +336,6 @@ func TestDispatcher_dispatch(t *testing.T) { WorkerCount: tt.fields.WorkerCount, Reporter: tt.fields.Reporter, DLX: tt.fields.DLX, - DLXName: tt.fields.DLXName, } client, err := v2.NewClient(tt.args.client)